ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [25/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6
Date Wed, 10 Jun 2015 16:27:48 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 10b84e2..adea9e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -54,7 +54,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     private GridDhtPartitionTopology top;
 
     /** Preloader. */
-    protected GridCachePreloader<K, V> preldr;
+    protected GridCachePreloader preldr;
 
     /** Multi tx future holder. */
     private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>();
@@ -75,7 +75,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
         super(ctx, ctx.config().getStartSize());
 
-        top = new GridDhtPartitionTopologyImpl<>(ctx);
+        top = new GridDhtPartitionTopologyImpl(ctx);
     }
 
     /**
@@ -87,7 +87,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
         super(ctx, map);
 
-        top = new GridDhtPartitionTopologyImpl<>(ctx);
+        top = new GridDhtPartitionTopologyImpl(ctx);
     }
 
     /** {@inheritDoc} */
@@ -168,17 +168,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override public GridCachePreloader<K, V> preloader() {
+    @Override public GridCachePreloader preloader() {
         return preldr;
     }
 
     /**
      * @return DHT preloader.
      */
-    public GridDhtPreloader<K, V> dhtPreloader() {
+    public GridDhtPreloader dhtPreloader() {
         assert preldr instanceof GridDhtPreloader;
 
-        return (GridDhtPreloader<K, V>)preldr;
+        return (GridDhtPreloader)preldr;
     }
 
     /**
@@ -932,6 +932,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
+     * @param expVer Expected topology version.
+     * @param curVer Current topology version.
+     * @return {@code True} if cache affinity changed and operation should be remapped.
+     */
+    protected final boolean needRemap(AffinityTopologyVersion expVer, AffinityTopologyVersion curVer) {
+        if (expVer.equals(curVer))
+            return false;
+
+        Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
+        Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+
+        return !cacheNodes0.equals(cacheNodes1);
+    }
+
+    /**
      * @param primary If {@code true} includes primary entries.
      * @param backup If {@code true} includes backup entries.
      * @return Local entries iterator.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index c9a7af8..89b85c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -292,12 +292,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         return ret;
     }
 
-    /**
-     * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
-     */
+    /** {@inheritDoc} */
     @Override public void onUnlock() {
-        super.onUnlock();
-
         locPart.onUnlock();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index f6f930e..742fbfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -295,6 +295,11 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                     if (info == null)
                         continue;
 
+                    boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
+
+                    if (addReader)
+                        e.unswap(false);
+
                     // Register reader. If there are active transactions for this entry,
                     // then will wait for their completion before proceeding.
                     // TODO: GG-4003:
@@ -303,8 +308,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                     // TODO: To fix, check that reader is contained in the list of readers once
                     // TODO: again after the returned future completes - if not, try again.
                     // TODO: Also, why is info read before transactions are complete, and not after?
-                    IgniteInternalFuture<Boolean> f = (!e.deleted() && k.getValue() && !skipVals) ?
-                        e.addReader(reader, msgId, topVer) : null;
+                    IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
 
                     if (f != null) {
                         if (txFut == null)
@@ -317,6 +321,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
 
                     break;
                 }
+                catch (IgniteCheckedException err) {
+                    return new GridFinishedFuture<>(err);
+                }
                 catch (GridCacheEntryRemovedException ignore) {
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry when getting a DHT value: " + e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c57eded..bdaa552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 /**
  * Cache lock future.
  */
-public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
     /** */
     private static final long serialVersionUID = 0L;
@@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<K, V> cctx;
+    private GridCacheContext<?, ?> cctx;
 
     /** Near node ID. */
     private UUID nearNodeId;
@@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
      * @param skipStore Skip store flag.
      */
     public GridDhtLockFuture(
-        GridCacheContext<K, V> cctx,
+        GridCacheContext<?, ?> cctx,
         UUID nearNodeId,
         GridCacheVersion nearLockVer,
         @NotNull AffinityTopologyVersion topVer,
@@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
      * @param cacheCtx Cache context.
      * @param invalidPart Partition to retry.
      */
-    void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) {
+    void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) {
         invalidParts.add(invalidPart);
 
         // Register invalid partitions with transaction.
@@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
          * @param entries Entries to check.
          */
         @SuppressWarnings({"ForLoopReplaceableByForEach"})
-        private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
+        private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
             @Nullable List<GridDhtCacheEntry> entries) {
             if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty())
                 return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 073e0e7..374ab87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  * Partition topology.
  */
 @GridToStringExclude
-class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
+class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -49,7 +49,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
     private static final boolean FULL_MAP_DEBUG = false;
 
     /** Context. */
-    private final GridCacheContext<K, V> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -85,7 +85,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
     /**
      * @param cctx Context.
      */
-    GridDhtPartitionTopologyImpl(GridCacheContext<K, V> cctx) {
+    GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx) {
         assert cctx != null;
 
         this.cctx = cctx;
@@ -239,7 +239,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx.shared(), topVer);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+            assert oldest != null;
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -247,7 +249,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
             long updateSeq = this.updateSeq.incrementAndGet();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) {
+            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -274,7 +276,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
             if (cctx.rebalanceEnabled()) {
                 for (int p = 0; p < num; p++) {
                     // If this is the first node in grid.
-                    boolean added = exchFut.isCacheAdded(cctx.cacheId());
+                    boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
 
                     if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) {
                         assert exchId.isJoined() || added;
@@ -604,7 +606,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
 
         try {
             return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(),
-                F.viewReadOnly(locParts, CU.<K, V>part2state()), true);
+                F.viewReadOnly(locParts, CU.part2state()), true);
         }
         finally {
             lock.readLock().unlock();
@@ -660,13 +662,15 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
      * @return List of nodes for the partition.
      */
     private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null;
+        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
 
         lock.readLock().lock();
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
-                ", allIds=" + allIds + ", node2part=" + node2part + ']';
+                ", allIds=" + allIds +
+                ", node2part=" + node2part +
+                ", cache=" + cctx.name() + ']';
 
             Collection<UUID> nodeIds = part2node.get(p);
 
@@ -738,7 +742,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']';
+                ", cache=" + cctx.name() +
+                ", started=" + cctx.started() +
+                ", stopping=" + stopping +
+                ", locNodeId=" + cctx.localNode().id() +
+                ", locName=" + cctx.gridName() + ']';
 
             GridDhtPartitionFullMap m = node2part;
 
@@ -756,6 +764,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
 
+        assert partMap != null;
+
         lock.writeLock().lock();
 
         try {
@@ -1024,7 +1034,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
         assert nodeId.equals(cctx.nodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+        assert oldest != null;
 
         // If this node became the oldest node.
         if (oldest.id().equals(cctx.nodeId())) {
@@ -1074,7 +1086,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+        assert oldest != null;
 
         ClusterNode loc = cctx.localNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 26eef50..703daf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -81,7 +81,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        preldr = new GridDhtPreloader<>(ctx);
+        preldr = new GridDhtPreloader(ctx);
 
         preldr.start();
 
@@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             return;
         }
 
-        // Group lock can be only started from local node, so we never start group lock transaction on remote node.
         IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
 
         // Register listener just so we print out errors.
@@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
         assert nodeId != null;
         assert res != null;
-        GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
-            res.futureId());
+        GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())
@@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
         assert tx != null;
 
-        GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
+        GridDhtLockFuture fut = new GridDhtLockFuture(
             ctx,
             tx.nearNodeId(),
             tx.nearXidVersion(),
@@ -669,7 +667,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @return Future.
      */
     public IgniteInternalFuture<GridNearLockResponse> lockAllAsync(
-        final GridCacheContext<K, V> cacheCtx,
+        final GridCacheContext<?, ?> cacheCtx,
         final ClusterNode nearNode,
         final GridNearLockRequest req,
         @Nullable final CacheEntryPredicate[] filter0) {
@@ -719,26 +717,57 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         if (filter == null)
                             filter = req.filter();
 
-                        GridDhtLockFuture<K, V> fut = null;
+                        GridDhtLockFuture fut = null;
 
                         if (!req.inTx()) {
-                            fut = new GridDhtLockFuture<>(ctx,
-                                nearNode.id(),
-                                req.version(),
-                                req.topologyVersion(),
-                                cnt,
-                                req.txRead(),
-                                req.needReturnValue(),
-                                req.timeout(),
-                                tx,
-                                req.threadId(),
-                                req.accessTtl(),
-                                filter,
-                                req.skipStore());
+                            GridDhtPartitionTopology top = null;
+
+                            if (req.firstClientRequest()) {
+                                assert CU.clientNode(nearNode);
+
+                                top = topology();
+
+                                topology().readLock();
+                            }
+
+                            try {
+                                if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Client topology version mismatch, need remap lock request [" +
+                                            "reqTopVer=" + req.topologyVersion() +
+                                            ", locTopVer=" + top.topologyVersion() +
+                                            ", req=" + req + ']');
+                                    }
+
+                                    GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+                                        req,
+                                        top.topologyVersion());
+
+                                    return new GridFinishedFuture<>(res);
+                                }
+
+                                fut = new GridDhtLockFuture(ctx,
+                                    nearNode.id(),
+                                    req.version(),
+                                    req.topologyVersion(),
+                                    cnt,
+                                    req.txRead(),
+                                    req.needReturnValue(),
+                                    req.timeout(),
+                                    tx,
+                                    req.threadId(),
+                                    req.accessTtl(),
+                                    filter,
+                                    req.skipStore());
 
-                            // Add before mapping.
-                            if (!ctx.mvcc().addFuture(fut))
-                                throw new IllegalStateException("Duplicate future ID: " + fut);
+                                // Add before mapping.
+                                if (!ctx.mvcc().addFuture(fut))
+                                    throw new IllegalStateException("Duplicate future ID: " + fut);
+                            }
+                            finally {
+                                if (top != null)
+                                    top.readUnlock();
+                            }
                         }
 
                         boolean timedout = false;
@@ -788,45 +817,76 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         // Handle implicit locks for pessimistic transactions.
                         if (req.inTx()) {
                             if (tx == null) {
-                                tx = new GridDhtTxLocal(
-                                    ctx.shared(),
-                                    nearNode.id(),
-                                    req.version(),
-                                    req.futureId(),
-                                    req.miniId(),
-                                    req.threadId(),
-                                    req.implicitTx(),
-                                    req.implicitSingleTx(),
-                                    ctx.systemTx(),
-                                    false,
-                                    ctx.ioPolicy(),
-                                    PESSIMISTIC,
-                                    req.isolation(),
-                                    req.timeout(),
-                                    req.isInvalidate(),
-                                    false,
-                                    req.txSize(),
-                                    null,
-                                    req.subjectId(),
-                                    req.taskNameHash());
+                                GridDhtPartitionTopology top = null;
 
-                                tx.syncCommit(req.syncCommit());
+                                if (req.firstClientRequest()) {
+                                    assert CU.clientNode(nearNode);
 
-                                tx = ctx.tm().onCreated(null, tx);
+                                    top = topology();
 
-                                if (tx == null || !tx.init()) {
-                                    String msg = "Failed to acquire lock (transaction has been completed): " +
-                                        req.version();
+                                    topology().readLock();
+                                }
 
-                                    U.warn(log, msg);
+                                try {
+                                    if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("Client topology version mismatch, need remap lock request [" +
+                                                "reqTopVer=" + req.topologyVersion() +
+                                                ", locTopVer=" + top.topologyVersion() +
+                                                ", req=" + req + ']');
+                                        }
 
-                                    if (tx != null)
-                                        tx.rollback();
+                                        GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+                                            req,
+                                            top.topologyVersion());
 
-                                    return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
-                                }
+                                        return new GridFinishedFuture<>(res);
+                                    }
 
-                                tx.topologyVersion(req.topologyVersion());
+                                    tx = new GridDhtTxLocal(
+                                        ctx.shared(),
+                                        nearNode.id(),
+                                        req.version(),
+                                        req.futureId(),
+                                        req.miniId(),
+                                        req.threadId(),
+                                        req.implicitTx(),
+                                        req.implicitSingleTx(),
+                                        ctx.systemTx(),
+                                        false,
+                                        ctx.ioPolicy(),
+                                        PESSIMISTIC,
+                                        req.isolation(),
+                                        req.timeout(),
+                                        req.isInvalidate(),
+                                        false,
+                                        req.txSize(),
+                                        null,
+                                        req.subjectId(),
+                                        req.taskNameHash());
+
+                                    tx.syncCommit(req.syncCommit());
+
+                                    tx = ctx.tm().onCreated(null, tx);
+
+                                    if (tx == null || !tx.init()) {
+                                        String msg = "Failed to acquire lock (transaction has been completed): " +
+                                            req.version();
+
+                                        U.warn(log, msg);
+
+                                        if (tx != null)
+                                            tx.rollback();
+
+                                        return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+                                    }
+
+                                    tx.topologyVersion(req.topologyVersion());
+                                }
+                                finally {
+                                    if (top != null)
+                                        top.readUnlock();
+                                }
                             }
 
                             ctx.tm().txContext(tx);
@@ -947,6 +1007,42 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     }
 
     /**
+     * @param nearNode Client node.
+     * @param req Request.
+     * @param topVer Remap version.
+     * @return Response.
+     */
+    private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode,
+        GridNearLockRequest req,
+        AffinityTopologyVersion topVer) {
+        assert topVer != null;
+
+        GridNearLockResponse res = new GridNearLockResponse(
+            ctx.cacheId(),
+            req.version(),
+            req.futureId(),
+            req.miniId(),
+            false,
+            0,
+            null,
+            topVer);
+
+        try {
+            ctx.io().send(nearNode, res, ctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send client lock remap response, client node failed " +
+                    "[node=" + nearNode + ", req=" + req + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e);
+        }
+
+        return res;
+    }
+
+    /**
      * @param nearNode Near node.
      * @param entries Entries.
      * @param req Lock request.
@@ -968,7 +1064,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         try {
             // Send reply back to originating near node.
             GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(),
-                req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                tx != null && tx.onePhaseCommit(),
+                entries.size(),
+                err,
+                null);
 
             if (err == null) {
                 res.pending(localDhtPendingVersions(entries, mappedVer));
@@ -1077,8 +1179,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             U.error(log, "Failed to get value for lock reply message for node [node=" +
                 U.toShortString(nearNode) + ", req=" + req + ']', e);
 
-            return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false,
-                entries.size(), e);
+            return new GridNearLockResponse(ctx.cacheId(),
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                false,
+                entries.size(),
+                e,
+                null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 54b59b8..90edb0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     private static final long serialVersionUID = 0L;
 
     /** Near mappings. */
-    protected Map<UUID, GridDistributedTxMapping> nearMap =
-        new ConcurrentHashMap8<>();
+    protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
 
     /** DHT mappings. */
-    protected Map<UUID, GridDistributedTxMapping> dhtMap =
-        new ConcurrentHashMap8<>();
+    protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
 
     /** Mapped flag. */
-    private AtomicBoolean mapped = new AtomicBoolean();
+    protected AtomicBoolean mapped = new AtomicBoolean();
 
     /** */
     private long dhtThreadId;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 293cf95..af0fbdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             tx.writeVersion(),
             tx.invalidPartitions(),
             ret,
-            prepErr);
+            prepErr,
+            null);
 
         if (prepErr == null) {
             addDhtValues(res);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8bbfe96..8630421 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -171,7 +171,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         metrics = m;
 
-        preldr = new GridDhtPreloader<>(ctx);
+        preldr = new GridDhtPreloader(ctx);
 
         preldr.start();
 
@@ -737,6 +737,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable final CacheEntryPredicate[] filter,
         final boolean waitTopFut
     ) {
+        assert ctx.updatesAllowed();
+
         if (map != null && keyCheck)
             validateCacheKeys(map.keySet());
 
@@ -793,6 +795,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean rawRetval,
         @Nullable final CacheEntryPredicate[] filter
     ) {
+        assert ctx.updatesAllowed();
+
         final boolean statsEnabled = ctx.config().isStatisticsEnabled();
 
         final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -1024,9 +1028,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         IgniteCacheExpiryPolicy expiry = null;
 
         try {
-            // If batch store update is enabled, we need to lock all entries.
-            // First, need to acquire locks on cache entries, then check filter.
-            List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion());
+            List<GridDhtCacheEntry> locked = null;
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
             try {
@@ -1043,11 +1045,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     // Do not check topology version for CLOCK versioning since
-                    // partition exchange will wait for near update future.
+                    // partition exchange will wait for near update future (if future is on server node).
                     // Also do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() ||
-                        ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+                        !needRemap(req.topologyVersion(), topology().topologyVersion())) {
                         ClusterNode node = ctx.discovery().node(nodeId);
 
                         if (node == null) {
@@ -1056,13 +1058,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             return;
                         }
 
+                        // If batch store update is enabled, we need to lock all entries.
+                        // First, need to acquire locks on cache entries, then check filter.
+                        locked = lockEntries(keys, req.topologyVersion());
+
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
                         GridCacheVersion ver = req.updateVersion();
 
                         if (ver == null) {
                             // Assign next version for update inside entries lock.
-                            ver = ctx.versions().next(req.topologyVersion());
+                            ver = ctx.versions().next(topology().topologyVersion());
 
                             if (hasNear)
                                 res.nearVersion(ver);
@@ -1105,7 +1111,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 retVal = updRes.invokeResults();
                         }
                         else {
-                            UpdateSingleResult<K, V> updRes = updateSingle(node,
+                            UpdateSingleResult updRes = updateSingle(node,
                                 hasNear,
                                 req,
                                 res,
@@ -1144,7 +1150,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 e.printStackTrace();
             }
             finally {
-                unlockEntries(locked, req.topologyVersion());
+                if (locked != null)
+                    unlockEntries(locked, req.topologyVersion());
 
                 // Enqueue if necessary after locks release.
                 if (deleted != null) {
@@ -1157,7 +1164,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         catch (GridDhtInvalidPartitionException ignore) {
-            assert ctx.config().getAtomicWriteOrderMode() == PRIMARY;
+            assert !req.fastMap() || req.clientRequest() : req;
 
             if (log.isDebugEnabled())
                 log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
@@ -1605,7 +1612,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
-    private UpdateSingleResult<K, V> updateSingle(
+    private UpdateSingleResult updateSingle(
         ClusterNode node,
         boolean hasNear,
         GridNearAtomicUpdateRequest req,
@@ -1799,7 +1806,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
 
-        return new UpdateSingleResult<>(retVal, deleted, dhtFut);
+        return new UpdateSingleResult(retVal, deleted, dhtFut);
     }
 
     /**
@@ -2572,7 +2579,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * Result of {@link GridDhtAtomicCache#updateSingle} execution.
      */
-    private static class UpdateSingleResult<K, V> {
+    private static class UpdateSingleResult {
         /** */
         private final GridCacheReturn retVal;
 
@@ -2772,14 +2779,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /** {@inheritDoc} */
         @Override public void onTimeout() {
             if (guard.compareAndSet(false, true)) {
-                writeLock().lock();
+                ctx.closures().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        writeLock().lock();
 
-                try {
-                    finish();
-                }
-                finally {
-                    writeLock().unlock();
-                }
+                        try {
+                            finish();
+                        }
+                        finally {
+                            writeLock().unlock();
+                        }
+                    }
+                });
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 40ab104..ff8454e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Future keys. */
     private Collection<KeyCacheObject> keys;
 
+    /** */
+    private boolean waitForExchange;
+
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
 
         keys = new ArrayList<>(updateReq.keys().size());
+
+        boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
+
+        waitForExchange = !topLocked;
     }
 
     /** {@inheritDoc} */
@@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** {@inheritDoc} */
     @Override public boolean waitForPartitionExchange() {
-        // Wait dht update futures in PRIMARY mode.
-        return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+        return waitForExchange;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 76e05e5..07f5ecf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -128,6 +128,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** Fast map flag. */
     private final boolean fastMap;
 
+    /** */
+    private boolean fastMapRemap;
+
+    /** */
+    private GridCacheVersion updVer;
+
     /** Near cache flag. */
     private final boolean nearEnabled;
 
@@ -304,11 +310,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
         if (topVer == null)
-            mapOnTopology(keys, false, null, waitTopFut);
+            mapOnTopology(null, false, null, waitTopFut);
         else {
             topLocked = true;
 
-            map0(topVer, keys, false, null);
+            map0(topVer, null, false, null);
         }
     }
 
@@ -343,9 +349,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      */
     public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (res.remapKeys() != null) {
-            assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
+            assert !fastMap || cctx.kernalContext().clientNode();
+
+            Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
 
-            mapOnTopology(res.remapKeys(), true, nodeId, true);
+            mapOnTopology(remapKeys, true, nodeId, true);
 
             return;
         }
@@ -454,9 +462,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             else {
                 if (waitTopFut) {
                     fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override
-                        public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                            mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+                                }
+                            });
                         }
                     });
                 }
@@ -476,29 +487,43 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /**
      * Checks if future is ready to be completed.
      */
-    private synchronized void checkComplete() {
-        if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
-            CachePartialUpdateCheckedException err0 = err;
+    private void checkComplete() {
+        boolean remap = false;
 
-            if (err0 != null)
-                onDone(err0);
-            else
-                onDone(opRes);
+        synchronized (this) {
+            if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
+                CachePartialUpdateCheckedException err0 = err;
+
+                if (err0 != null)
+                    onDone(err0);
+                else {
+                    if (fastMapRemap) {
+                        assert cctx.kernalContext().clientNode();
+
+                        remap = true;
+                    }
+                    else
+                        onDone(opRes);
+                }
+            }
         }
+
+        if (remap)
+            mapOnTopology(null, true, null, true);
     }
 
     /**
      * @param topVer Topology version.
-     * @param keys Keys to map.
+     * @param remapKeys Keys to remap or {@code null} to map all keys.
      * @param remap Flag indicating if this is partial remap for this future.
      * @param oldNodeId Old node ID if was remap.
      */
     private void map0(
         AffinityTopologyVersion topVer,
-        Collection<?> keys,
+        @Nullable Collection<?> remapKeys,
         boolean remap,
         @Nullable UUID oldNodeId) {
-        assert oldNodeId == null || remap;
+        assert oldNodeId == null || remap || fastMapRemap;
 
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
@@ -519,12 +544,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         CacheConfiguration ccfg = cctx.config();
 
         // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-        GridCacheVersion updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+        if (updVer == null)
+            updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
 
         if (updVer != null && log.isDebugEnabled())
             log.debug("Assigned fast-map version for update on near node: " + updVer);
 
         if (keys.size() == 1 && !fastMap && (single == null || single)) {
+            assert remapKeys == null || remapKeys.size() == 1 : remapKeys;
+
             Object key = F.first(keys);
 
             Object val;
@@ -610,7 +638,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 filter,
                 subjId,
                 taskNameHash,
-                skipStore);
+                skipStore,
+                cctx.kernalContext().clientNode());
 
             req.addUpdateEntry(cacheKey,
                 val,
@@ -647,9 +676,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         // Must do this in synchronized block because we need to atomically remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
-            if (remap)
+            if (oldNodeId != null)
                 removeMapping(oldNodeId);
 
+            // For fastMap mode wait for all responses before remapping.
+            if (remap && fastMap && !mappings.isEmpty()) {
+                fastMapRemap = true;
+
+                return;
+            }
+
             // Create mappings first, then send messages.
             for (Object key : keys) {
                 if (key == null) {
@@ -705,6 +741,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                 KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
+                if (remapKeys != null && !remapKeys.contains(cacheKey))
+                    continue;
+
                 if (op != TRANSFORM)
                     val = cctx.toCacheObject(val);
 
@@ -748,7 +787,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             filter,
                             subjId,
                             taskNameHash,
-                            skipStore);
+                            skipStore,
+                            cctx.kernalContext().clientNode());
 
                         pendingMappings.put(nodeId, mapped);
 
@@ -763,6 +803,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     i++;
                 }
             }
+
+            fastMapRemap = false;
         }
 
         if ((single == null || single) && pendingMappings.size() == 1) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index a96a666..86c5ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     /** Skip write-through to a persistent storage. */
     private boolean skipStore;
 
+    /** */
+    private boolean clientReq;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param fastMap Fast map scheme flag.
      * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
      * @param retval Return value required flag.
@@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
+     * @param clientReq Client node request flag.
      */
     public GridNearAtomicUpdateRequest(
         int cacheId,
@@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean skipStore
+        boolean skipStore,
+        boolean clientReq
     ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
@@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
+        this.clientReq = clientReq;
 
         keys = new ArrayList<>();
     }
@@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     }
 
     /**
+     * @return {@code True} if request sent from client node.
+     */
+    public boolean clientRequest() {
+        return clientReq;
+    }
+
+    /**
      * @return Cache write synchronization mode.
      */
     public CacheWriteSynchronizationMode writeSynchronizationMode() {
@@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+                if (!writer.writeBoolean("clientReq", clientReq))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("conflictTtls", conflictTtls))
+                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("conflictTtls", conflictTtls))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeBoolean("fastMap", fastMap))
+                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                if (!writer.writeBoolean("fastMap", fastMap))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("hasPrimary", hasPrimary))
+                if (!writer.writeMessage("futVer", futVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeBoolean("hasPrimary", hasPrimary))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeBoolean("retval", retval))
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeBoolean("skipStore", skipStore))
+                if (!writer.writeBoolean("retval", retval))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("skipStore", skipStore))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeBoolean("topLocked", topLocked))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeBoolean("topLocked", topLocked))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("updateVer", updateVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 23:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
         switch (reader.state()) {
             case 3:
-                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+                clientReq = reader.readBoolean("clientReq");
 
                 if (!reader.isLastRead())
                     return false;
@@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 4:
-                conflictTtls = reader.readMessage("conflictTtls");
+                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 5:
-                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+                conflictTtls = reader.readMessage("conflictTtls");
 
                 if (!reader.isLastRead())
                     return false;
@@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 6:
-                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 7:
-                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
                     return false;
@@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 8:
-                fastMap = reader.readBoolean("fastMap");
+                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 9:
-                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+                fastMap = reader.readBoolean("fastMap");
 
                 if (!reader.isLastRead())
                     return false;
@@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 10:
-                futVer = reader.readMessage("futVer");
+                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 11:
-                hasPrimary = reader.readBoolean("hasPrimary");
+                futVer = reader.readMessage("futVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 12:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+                hasPrimary = reader.readBoolean("hasPrimary");
 
                 if (!reader.isLastRead())
                     return false;
@@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 13:
-                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 14:
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
                 byte opOrd;
 
                 opOrd = reader.readByte("op");
@@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 15:
+            case 16:
                 retval = reader.readBoolean("retval");
 
                 if (!reader.isLastRead())
@@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 16:
+            case 17:
                 skipStore = reader.readBoolean("skipStore");
 
                 if (!reader.isLastRead())
@@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 17:
+            case 18:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 18:
+            case 19:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 topLocked = reader.readBoolean("topLocked");
 
                 if (!reader.isLastRead())
@@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 updateVer = reader.readMessage("updateVer");
 
                 if (!reader.isLastRead())
@@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 24;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 05b3c7b..221b230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable TransactionIsolation isolation,
         long accessTtl
     ) {
-        assert tx == null || tx instanceof GridNearTxLocal;
+        assert tx == null || tx instanceof GridNearTxLocal : tx;
 
         GridNearTxLocal txx = (GridNearTxLocal)tx;
 
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-        GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+        GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx,
             keys,
             txx,
             isRead,
@@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @return Lock future.
      */
     IgniteInternalFuture<Exception> lockAllAsync(
-        final GridCacheContext<K, V> cacheCtx,
+        final GridCacheContext<?, ?> cacheCtx,
         @Nullable final GridNearTxLocal tx,
         final long threadId,
         final GridCacheVersion ver,
@@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @return Lock future.
      */
     private IgniteInternalFuture<Exception> lockAllAsync0(
-        GridCacheContext<K, V> cacheCtx,
+        GridCacheContext<?, ?> cacheCtx,
         @Nullable final GridNearTxLocal tx,
         long threadId,
         final GridCacheVersion ver,
@@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         int cnt = keys.size();
 
         if (tx == null) {
-            GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+            GridDhtLockFuture fut = new GridDhtLockFuture(ctx,
                 ctx.localNodeId(),
                 ver,
                 topVer,
@@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         assert nodeId != null;
         assert res != null;
 
-        GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc().
+        GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
             <Boolean>future(res.version(), res.futureId());
 
         if (fut != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 372c517..c784948 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*;
 /**
  * Colocated cache lock future.
  */
-public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
     implements GridCacheFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<K, V> cctx;
+    private GridCacheContext<?, ?> cctx;
 
     /** Lock owner thread. */
     @GridToStringInclude
@@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @param timeout Lock acquisition timeout.
      * @param accessTtl TTL for read operation.
      * @param filter Filter.
-     * @param skipStore
+     * @param skipStore Skip store flag.
      */
     public GridDhtColocatedLockFuture(
-        GridCacheContext<K, V> cctx,
+        GridCacheContext<?, ?> cctx,
         Collection<KeyCacheObject> keys,
         @Nullable GridNearTxLocal tx,
         boolean read,
@@ -326,13 +326,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * Undoes all locks.
      *
      * @param dist If {@code true}, then remove locks from remote nodes as well.
+     * @param rollback {@code True} if should rollback tx.
      */
-    private void undoLocks(boolean dist) {
+    private void undoLocks(boolean dist, boolean rollback) {
         // Transactions will undo during rollback.
         if (dist && tx == null)
             cctx.colocated().removeLocks(threadId, lockVer, keys);
         else {
-            if (tx != null) {
+            if (rollback && tx != null) {
                 if (tx.setRollbackOnly()) {
                     if (log.isDebugEnabled())
                         log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -346,16 +347,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
     }
 
     /**
-     *
-     * @param dist {@code True} if need to distribute lock release.
-     */
-    private void onFailed(boolean dist) {
-        undoLocks(dist);
-
-        complete(false);
-    }
-
-    /**
      * @param success Success flag.
      */
     public void complete(boolean success) {
@@ -475,7 +466,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                 ", fut=" + this + ']');
 
         if (!success)
-            undoLocks(distribute);
+            undoLocks(distribute, true);
 
         if (tx != null)
             cctx.tm().txContext(tx);
@@ -550,7 +541,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             // Continue mapping on the same topology version as it was before.
             this.topVer.compareAndSet(null, topVer);
 
-            map(keys);
+            map(keys, false);
 
             markInitialized();
 
@@ -558,14 +549,17 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         }
 
         // Must get topology snapshot and map on that version.
-        mapOnTopology();
+        mapOnTopology(false, null);
     }
 
     /**
      * Acquires topology future and checks it completeness under the read lock. If it is not complete,
      * will asynchronously wait for it's completeness and then try again.
+     *
+     * @param remap Remap flag.
+     * @param c Optional closure to run after map.
      */
-    private void mapOnTopology() {
+    private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {
         // We must acquire topology snapshot from the topology version future.
         cctx.topology().readLock();
 
@@ -589,19 +583,30 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                 AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                if (tx != null)
-                    tx.topologyVersion(topVer);
+                if (remap) {
+                    if (tx != null)
+                        tx.onRemap(topVer);
+
+                    this.topVer.set(topVer);
+                }
+                else {
+                    if (tx != null)
+                        tx.topologyVersion(topVer);
+
+                    this.topVer.compareAndSet(null, topVer);
+                }
 
-                this.topVer.compareAndSet(null, topVer);
+                map(keys, remap);
 
-                map(keys);
+                if (c != null)
+                    c.run();
 
                 markInitialized();
             }
             else {
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                        mapOnTopology();
+                        mapOnTopology(remap, c);
                     }
                 });
             }
@@ -617,8 +622,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * groups belonging to one primary node and locks for these groups are acquired sequentially.
      *
      * @param keys Keys.
+     * @param remap Remap flag.
      */
-    private void map(Collection<KeyCacheObject> keys) {
+    private void map(Collection<KeyCacheObject> keys, boolean remap) {
         try {
             AffinityTopologyVersion topVer = this.topVer.get();
 
@@ -633,8 +639,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                 return;
             }
 
+            boolean clientNode = cctx.kernalContext().clientNode();
+
+            assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+
             // First assume this node is primary for all keys passed in.
-            if (mapAsPrimary(keys, topVer))
+            if (!clientNode && mapAsPrimary(keys, topVer))
                 return;
 
             Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
@@ -668,6 +678,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
             boolean hasRmtNodes = false;
 
+            boolean first = true;
+
             // Create mini futures.
             for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
                 GridNearLockMapping mapping = iter.next();
@@ -736,6 +748,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                             if (cand != null && !cand.reentry()) {
                                 if (req == null) {
+                                    boolean clientFirst = false;
+
+                                    if (first) {
+                                        clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+
+                                        first = false;
+                                    }
+
                                     req = new GridNearLockRequest(
                                         cctx.cacheId(),
                                         topVer,
@@ -757,7 +777,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                                         inTx() ? tx.subjectId() : null,
                                         inTx() ? tx.taskNameHash() : 0,
                                         read ? accessTtl : -1L,
-                                        skipStore);
+                                        skipStore,
+                                        clientFirst);
 
                                     mapping.request(req);
                                 }
@@ -815,7 +836,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             if (hasRmtNodes) {
                 trackable = true;
 
-                if (!cctx.mvcc().addFuture(this))
+                if (!remap && !cctx.mvcc().addFuture(this))
                     throw new IllegalStateException("Duplicate future ID: " + this);
             }
             else
@@ -1249,75 +1270,111 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                     return;
                 }
 
-                int i = 0;
+                if (res.clientRemapVersion() != null) {
+                    assert cctx.kernalContext().clientNode();
+
+                    IgniteInternalFuture<?> affFut =
+                        cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+
+                    if (affFut != null && !affFut.isDone()) {
+                        affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                remap();
+                            }
+                        });
+                    }
+                    else
+                        remap();
+                }
+                else  {
+                    int i = 0;
 
-                for (KeyCacheObject k : keys) {
-                    IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
+                    for (KeyCacheObject k : keys) {
+                        IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
 
-                    CacheObject newVal = res.value(i);
+                        CacheObject newVal = res.value(i);
 
-                    GridCacheVersion dhtVer = res.dhtVersion(i);
+                        GridCacheVersion dhtVer = res.dhtVersion(i);
 
-                    if (newVal == null) {
-                        if (oldValTup != null) {
-                            if (oldValTup.get1().equals(dhtVer))
-                                newVal = oldValTup.get2();
+                        if (newVal == null) {
+                            if (oldValTup != null) {
+                                if (oldValTup.get1().equals(dhtVer))
+                                    newVal = oldValTup.get2();
+                            }
                         }
-                    }
 
-                    if (inTx()) {
-                        IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+                        if (inTx()) {
+                            IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+
+                            // In colocated cache we must receive responses only for detached entries.
+                            assert txEntry.cached().detached() : txEntry;
 
-                        // In colocated cache we must receive responses only for detached entries.
-                        assert txEntry.cached().detached();
+                            txEntry.markLocked();
 
-                        txEntry.markLocked();
+                            GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
 
-                        GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
+                            if (res.dhtVersion(i) == null) {
+                                onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+                                    "(will fail the lock): " + res));
+
+                                return;
+                            }
 
-                        if (res.dhtVersion(i) == null) {
-                            onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
-                                "(will fail the lock): " + res));
+                            // Set value to detached entry.
+                            entry.resetFromPrimary(newVal, dhtVer);
 
-                            return;
+                            tx.hasRemoteLocks(true);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                        }
+                        else
+                            cctx.mvcc().markExplicitOwner(k, threadId);
+
+                        if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                            cctx.events().addEvent(cctx.affinity().partition(k),
+                                k,
+                                tx,
+                                null,
+                                EVT_CACHE_OBJECT_READ,
+                                newVal,
+                                newVal != null,
+                                null,
+                                false,
+                                CU.subjectId(tx, cctx.shared()),
+                                null,
+                                tx == null ? null : tx.resolveTaskName());
                         }
 
-                        // Set value to detached entry.
-                        entry.resetFromPrimary(newVal, dhtVer);
+                        i++;
+                    }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                    try {
+                        proceedMapping(mappings);
                     }
-                    else
-                        cctx.mvcc().markExplicitOwner(k, threadId);
-
-                    if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        cctx.events().addEvent(cctx.affinity().partition(k),
-                            k,
-                            tx,
-                            null,
-                            EVT_CACHE_OBJECT_READ,
-                            newVal,
-                            newVal != null,
-                            null,
-                            false,
-                            CU.subjectId(tx, cctx.shared()),
-                            null,
-                            tx == null ? null : tx.resolveTaskName());
+                    catch (IgniteCheckedException e) {
+                        onDone(e);
                     }
 
-                    i++;
+                    onDone(true);
                 }
+            }
+        }
 
-                try {
-                    proceedMapping(mappings);
-                }
-                catch (IgniteCheckedException e) {
-                    onDone(e);
-                }
+        /**
+         *
+         */
+        private void remap() {
+            undoLocks(false, false);
 
-                onDone(true);
-            }
+            for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys)
+                cctx.mvcc().removeExplicitLock(threadId, key, lockVer);
+
+            mapOnTopology(true, new Runnable() {
+                @Override public void run() {
+                    onDone(true);
+                }
+            });
         }
 
         /** {@inheritDoc} */


Mime
View raw message