ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [23/28] incubator-ignite git commit: ignite-545: merge from sprint-6
Date Wed, 10 Jun 2015 14:11:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 26eef50..703daf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -81,7 +81,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        preldr = new GridDhtPreloader<>(ctx);
+        preldr = new GridDhtPreloader(ctx);
 
         preldr.start();
 
@@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             return;
         }
 
-        // Group lock can be only started from local node, so we never start group lock transaction on remote node.
         IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
 
         // Register listener just so we print out errors.
@@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
         assert nodeId != null;
         assert res != null;
-        GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
-            res.futureId());
+        GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())
@@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
         assert tx != null;
 
-        GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
+        GridDhtLockFuture fut = new GridDhtLockFuture(
             ctx,
             tx.nearNodeId(),
             tx.nearXidVersion(),
@@ -669,7 +667,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @return Future.
      */
     public IgniteInternalFuture<GridNearLockResponse> lockAllAsync(
-        final GridCacheContext<K, V> cacheCtx,
+        final GridCacheContext<?, ?> cacheCtx,
         final ClusterNode nearNode,
         final GridNearLockRequest req,
         @Nullable final CacheEntryPredicate[] filter0) {
@@ -719,26 +717,57 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         if (filter == null)
                             filter = req.filter();
 
-                        GridDhtLockFuture<K, V> fut = null;
+                        GridDhtLockFuture fut = null;
 
                         if (!req.inTx()) {
-                            fut = new GridDhtLockFuture<>(ctx,
-                                nearNode.id(),
-                                req.version(),
-                                req.topologyVersion(),
-                                cnt,
-                                req.txRead(),
-                                req.needReturnValue(),
-                                req.timeout(),
-                                tx,
-                                req.threadId(),
-                                req.accessTtl(),
-                                filter,
-                                req.skipStore());
+                            GridDhtPartitionTopology top = null;
+
+                            if (req.firstClientRequest()) {
+                                assert CU.clientNode(nearNode);
+
+                                top = topology();
+
+                                topology().readLock();
+                            }
+
+                            try {
+                                if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Client topology version mismatch, need remap lock request [" +
+                                            "reqTopVer=" + req.topologyVersion() +
+                                            ", locTopVer=" + top.topologyVersion() +
+                                            ", req=" + req + ']');
+                                    }
+
+                                    GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+                                        req,
+                                        top.topologyVersion());
+
+                                    return new GridFinishedFuture<>(res);
+                                }
+
+                                fut = new GridDhtLockFuture(ctx,
+                                    nearNode.id(),
+                                    req.version(),
+                                    req.topologyVersion(),
+                                    cnt,
+                                    req.txRead(),
+                                    req.needReturnValue(),
+                                    req.timeout(),
+                                    tx,
+                                    req.threadId(),
+                                    req.accessTtl(),
+                                    filter,
+                                    req.skipStore());
 
-                            // Add before mapping.
-                            if (!ctx.mvcc().addFuture(fut))
-                                throw new IllegalStateException("Duplicate future ID: " + fut);
+                                // Add before mapping.
+                                if (!ctx.mvcc().addFuture(fut))
+                                    throw new IllegalStateException("Duplicate future ID: " + fut);
+                            }
+                            finally {
+                                if (top != null)
+                                    top.readUnlock();
+                            }
                         }
 
                         boolean timedout = false;
@@ -788,45 +817,76 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         // Handle implicit locks for pessimistic transactions.
                         if (req.inTx()) {
                             if (tx == null) {
-                                tx = new GridDhtTxLocal(
-                                    ctx.shared(),
-                                    nearNode.id(),
-                                    req.version(),
-                                    req.futureId(),
-                                    req.miniId(),
-                                    req.threadId(),
-                                    req.implicitTx(),
-                                    req.implicitSingleTx(),
-                                    ctx.systemTx(),
-                                    false,
-                                    ctx.ioPolicy(),
-                                    PESSIMISTIC,
-                                    req.isolation(),
-                                    req.timeout(),
-                                    req.isInvalidate(),
-                                    false,
-                                    req.txSize(),
-                                    null,
-                                    req.subjectId(),
-                                    req.taskNameHash());
+                                GridDhtPartitionTopology top = null;
 
-                                tx.syncCommit(req.syncCommit());
+                                if (req.firstClientRequest()) {
+                                    assert CU.clientNode(nearNode);
 
-                                tx = ctx.tm().onCreated(null, tx);
+                                    top = topology();
 
-                                if (tx == null || !tx.init()) {
-                                    String msg = "Failed to acquire lock (transaction has been completed): " +
-                                        req.version();
+                                    topology().readLock();
+                                }
 
-                                    U.warn(log, msg);
+                                try {
+                                    if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("Client topology version mismatch, need remap lock request [" +
+                                                "reqTopVer=" + req.topologyVersion() +
+                                                ", locTopVer=" + top.topologyVersion() +
+                                                ", req=" + req + ']');
+                                        }
 
-                                    if (tx != null)
-                                        tx.rollback();
+                                        GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+                                            req,
+                                            top.topologyVersion());
 
-                                    return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
-                                }
+                                        return new GridFinishedFuture<>(res);
+                                    }
 
-                                tx.topologyVersion(req.topologyVersion());
+                                    tx = new GridDhtTxLocal(
+                                        ctx.shared(),
+                                        nearNode.id(),
+                                        req.version(),
+                                        req.futureId(),
+                                        req.miniId(),
+                                        req.threadId(),
+                                        req.implicitTx(),
+                                        req.implicitSingleTx(),
+                                        ctx.systemTx(),
+                                        false,
+                                        ctx.ioPolicy(),
+                                        PESSIMISTIC,
+                                        req.isolation(),
+                                        req.timeout(),
+                                        req.isInvalidate(),
+                                        false,
+                                        req.txSize(),
+                                        null,
+                                        req.subjectId(),
+                                        req.taskNameHash());
+
+                                    tx.syncCommit(req.syncCommit());
+
+                                    tx = ctx.tm().onCreated(null, tx);
+
+                                    if (tx == null || !tx.init()) {
+                                        String msg = "Failed to acquire lock (transaction has been completed): " +
+                                            req.version();
+
+                                        U.warn(log, msg);
+
+                                        if (tx != null)
+                                            tx.rollback();
+
+                                        return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+                                    }
+
+                                    tx.topologyVersion(req.topologyVersion());
+                                }
+                                finally {
+                                    if (top != null)
+                                        top.readUnlock();
+                                }
                             }
 
                             ctx.tm().txContext(tx);
@@ -947,6 +1007,42 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     }
 
     /**
+     * @param nearNode Client node.
+     * @param req Request.
+     * @param topVer Remap version.
+     * @return Response.
+     */
+    private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode,
+        GridNearLockRequest req,
+        AffinityTopologyVersion topVer) {
+        assert topVer != null;
+
+        GridNearLockResponse res = new GridNearLockResponse(
+            ctx.cacheId(),
+            req.version(),
+            req.futureId(),
+            req.miniId(),
+            false,
+            0,
+            null,
+            topVer);
+
+        try {
+            ctx.io().send(nearNode, res, ctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send client lock remap response, client node failed " +
+                    "[node=" + nearNode + ", req=" + req + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e);
+        }
+
+        return res;
+    }
+
+    /**
      * @param nearNode Near node.
      * @param entries Entries.
      * @param req Lock request.
@@ -968,7 +1064,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         try {
             // Send reply back to originating near node.
             GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(),
-                req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                tx != null && tx.onePhaseCommit(),
+                entries.size(),
+                err,
+                null);
 
             if (err == null) {
                 res.pending(localDhtPendingVersions(entries, mappedVer));
@@ -1077,8 +1179,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             U.error(log, "Failed to get value for lock reply message for node [node=" +
                 U.toShortString(nearNode) + ", req=" + req + ']', e);
 
-            return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false,
-                entries.size(), e);
+            return new GridNearLockResponse(ctx.cacheId(),
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                false,
+                entries.size(),
+                e,
+                null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 54b59b8..90edb0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     private static final long serialVersionUID = 0L;
 
     /** Near mappings. */
-    protected Map<UUID, GridDistributedTxMapping> nearMap =
-        new ConcurrentHashMap8<>();
+    protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
 
     /** DHT mappings. */
-    protected Map<UUID, GridDistributedTxMapping> dhtMap =
-        new ConcurrentHashMap8<>();
+    protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
 
     /** Mapped flag. */
-    private AtomicBoolean mapped = new AtomicBoolean();
+    protected AtomicBoolean mapped = new AtomicBoolean();
 
     /** */
     private long dhtThreadId;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 293cf95..af0fbdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             tx.writeVersion(),
             tx.invalidPartitions(),
             ret,
-            prepErr);
+            prepErr,
+            null);
 
         if (prepErr == null) {
             addDhtValues(res);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8bbfe96..8630421 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -171,7 +171,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         metrics = m;
 
-        preldr = new GridDhtPreloader<>(ctx);
+        preldr = new GridDhtPreloader(ctx);
 
         preldr.start();
 
@@ -737,6 +737,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable final CacheEntryPredicate[] filter,
         final boolean waitTopFut
     ) {
+        assert ctx.updatesAllowed();
+
         if (map != null && keyCheck)
             validateCacheKeys(map.keySet());
 
@@ -793,6 +795,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean rawRetval,
         @Nullable final CacheEntryPredicate[] filter
     ) {
+        assert ctx.updatesAllowed();
+
         final boolean statsEnabled = ctx.config().isStatisticsEnabled();
 
         final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -1024,9 +1028,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         IgniteCacheExpiryPolicy expiry = null;
 
         try {
-            // If batch store update is enabled, we need to lock all entries.
-            // First, need to acquire locks on cache entries, then check filter.
-            List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion());
+            List<GridDhtCacheEntry> locked = null;
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
             try {
@@ -1043,11 +1045,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     // Do not check topology version for CLOCK versioning since
-                    // partition exchange will wait for near update future.
+                    // partition exchange will wait for near update future (if future is on server node).
                     // Also do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() ||
-                        ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+                        !needRemap(req.topologyVersion(), topology().topologyVersion())) {
                         ClusterNode node = ctx.discovery().node(nodeId);
 
                         if (node == null) {
@@ -1056,13 +1058,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             return;
                         }
 
+                        // If batch store update is enabled, we need to lock all entries.
+                        // First, need to acquire locks on cache entries, then check filter.
+                        locked = lockEntries(keys, req.topologyVersion());
+
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
                         GridCacheVersion ver = req.updateVersion();
 
                         if (ver == null) {
                             // Assign next version for update inside entries lock.
-                            ver = ctx.versions().next(req.topologyVersion());
+                            ver = ctx.versions().next(topology().topologyVersion());
 
                             if (hasNear)
                                 res.nearVersion(ver);
@@ -1105,7 +1111,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 retVal = updRes.invokeResults();
                         }
                         else {
-                            UpdateSingleResult<K, V> updRes = updateSingle(node,
+                            UpdateSingleResult updRes = updateSingle(node,
                                 hasNear,
                                 req,
                                 res,
@@ -1144,7 +1150,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 e.printStackTrace();
             }
             finally {
-                unlockEntries(locked, req.topologyVersion());
+                if (locked != null)
+                    unlockEntries(locked, req.topologyVersion());
 
                 // Enqueue if necessary after locks release.
                 if (deleted != null) {
@@ -1157,7 +1164,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         catch (GridDhtInvalidPartitionException ignore) {
-            assert ctx.config().getAtomicWriteOrderMode() == PRIMARY;
+            assert !req.fastMap() || req.clientRequest() : req;
 
             if (log.isDebugEnabled())
                 log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
@@ -1605,7 +1612,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
-    private UpdateSingleResult<K, V> updateSingle(
+    private UpdateSingleResult updateSingle(
         ClusterNode node,
         boolean hasNear,
         GridNearAtomicUpdateRequest req,
@@ -1799,7 +1806,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
 
-        return new UpdateSingleResult<>(retVal, deleted, dhtFut);
+        return new UpdateSingleResult(retVal, deleted, dhtFut);
     }
 
     /**
@@ -2572,7 +2579,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * Result of {@link GridDhtAtomicCache#updateSingle} execution.
      */
-    private static class UpdateSingleResult<K, V> {
+    private static class UpdateSingleResult {
         /** */
         private final GridCacheReturn retVal;
 
@@ -2772,14 +2779,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /** {@inheritDoc} */
         @Override public void onTimeout() {
             if (guard.compareAndSet(false, true)) {
-                writeLock().lock();
+                ctx.closures().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        writeLock().lock();
 
-                try {
-                    finish();
-                }
-                finally {
-                    writeLock().unlock();
-                }
+                        try {
+                            finish();
+                        }
+                        finally {
+                            writeLock().unlock();
+                        }
+                    }
+                });
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 40ab104..ff8454e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Future keys. */
     private Collection<KeyCacheObject> keys;
 
+    /** */
+    private boolean waitForExchange;
+
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
 
         keys = new ArrayList<>(updateReq.keys().size());
+
+        boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
+
+        waitForExchange = !topLocked;
     }
 
     /** {@inheritDoc} */
@@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** {@inheritDoc} */
     @Override public boolean waitForPartitionExchange() {
-        // Wait dht update futures in PRIMARY mode.
-        return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+        return waitForExchange;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 76e05e5..07f5ecf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -128,6 +128,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** Fast map flag. */
     private final boolean fastMap;
 
+    /** */
+    private boolean fastMapRemap;
+
+    /** */
+    private GridCacheVersion updVer;
+
     /** Near cache flag. */
     private final boolean nearEnabled;
 
@@ -304,11 +310,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
         if (topVer == null)
-            mapOnTopology(keys, false, null, waitTopFut);
+            mapOnTopology(null, false, null, waitTopFut);
         else {
             topLocked = true;
 
-            map0(topVer, keys, false, null);
+            map0(topVer, null, false, null);
         }
     }
 
@@ -343,9 +349,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      */
     public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (res.remapKeys() != null) {
-            assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
+            assert !fastMap || cctx.kernalContext().clientNode();
+
+            Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
 
-            mapOnTopology(res.remapKeys(), true, nodeId, true);
+            mapOnTopology(remapKeys, true, nodeId, true);
 
             return;
         }
@@ -454,9 +462,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             else {
                 if (waitTopFut) {
                     fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override
-                        public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                            mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+                                }
+                            });
                         }
                     });
                 }
@@ -476,29 +487,43 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /**
      * Checks if future is ready to be completed.
      */
-    private synchronized void checkComplete() {
-        if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
-            CachePartialUpdateCheckedException err0 = err;
+    private void checkComplete() {
+        boolean remap = false;
 
-            if (err0 != null)
-                onDone(err0);
-            else
-                onDone(opRes);
+        synchronized (this) {
+            if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
+                CachePartialUpdateCheckedException err0 = err;
+
+                if (err0 != null)
+                    onDone(err0);
+                else {
+                    if (fastMapRemap) {
+                        assert cctx.kernalContext().clientNode();
+
+                        remap = true;
+                    }
+                    else
+                        onDone(opRes);
+                }
+            }
         }
+
+        if (remap)
+            mapOnTopology(null, true, null, true);
     }
 
     /**
      * @param topVer Topology version.
-     * @param keys Keys to map.
+     * @param remapKeys Keys to remap or {@code null} to map all keys.
      * @param remap Flag indicating if this is partial remap for this future.
      * @param oldNodeId Old node ID if was remap.
      */
     private void map0(
         AffinityTopologyVersion topVer,
-        Collection<?> keys,
+        @Nullable Collection<?> remapKeys,
         boolean remap,
         @Nullable UUID oldNodeId) {
-        assert oldNodeId == null || remap;
+        assert oldNodeId == null || remap || fastMapRemap;
 
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
@@ -519,12 +544,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         CacheConfiguration ccfg = cctx.config();
 
         // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-        GridCacheVersion updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+        if (updVer == null)
+            updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
 
         if (updVer != null && log.isDebugEnabled())
             log.debug("Assigned fast-map version for update on near node: " + updVer);
 
         if (keys.size() == 1 && !fastMap && (single == null || single)) {
+            assert remapKeys == null || remapKeys.size() == 1 : remapKeys;
+
             Object key = F.first(keys);
 
             Object val;
@@ -610,7 +638,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 filter,
                 subjId,
                 taskNameHash,
-                skipStore);
+                skipStore,
+                cctx.kernalContext().clientNode());
 
             req.addUpdateEntry(cacheKey,
                 val,
@@ -647,9 +676,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         // Must do this in synchronized block because we need to atomically remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
-            if (remap)
+            if (oldNodeId != null)
                 removeMapping(oldNodeId);
 
+            // For fastMap mode wait for all responses before remapping.
+            if (remap && fastMap && !mappings.isEmpty()) {
+                fastMapRemap = true;
+
+                return;
+            }
+
             // Create mappings first, then send messages.
             for (Object key : keys) {
                 if (key == null) {
@@ -705,6 +741,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                 KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
+                if (remapKeys != null && !remapKeys.contains(cacheKey))
+                    continue;
+
                 if (op != TRANSFORM)
                     val = cctx.toCacheObject(val);
 
@@ -748,7 +787,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             filter,
                             subjId,
                             taskNameHash,
-                            skipStore);
+                            skipStore,
+                            cctx.kernalContext().clientNode());
 
                         pendingMappings.put(nodeId, mapped);
 
@@ -763,6 +803,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     i++;
                 }
             }
+
+            fastMapRemap = false;
         }
 
         if ((single == null || single) && pendingMappings.size() == 1) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index a96a666..86c5ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     /** Skip write-through to a persistent storage. */
     private boolean skipStore;
 
+    /** */
+    private boolean clientReq;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param fastMap Fast map scheme flag.
      * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
      * @param retval Return value required flag.
@@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
+     * @param clientReq Client node request flag.
      */
     public GridNearAtomicUpdateRequest(
         int cacheId,
@@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean skipStore
+        boolean skipStore,
+        boolean clientReq
     ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
@@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
+        this.clientReq = clientReq;
 
         keys = new ArrayList<>();
     }
@@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     }
 
     /**
+     * @return {@code True} if request sent from client node.
+     */
+    public boolean clientRequest() {
+        return clientReq;
+    }
+
+    /**
      * @return Cache write synchronization mode.
      */
     public CacheWriteSynchronizationMode writeSynchronizationMode() {
@@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+                if (!writer.writeBoolean("clientReq", clientReq))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("conflictTtls", conflictTtls))
+                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("conflictTtls", conflictTtls))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeBoolean("fastMap", fastMap))
+                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                if (!writer.writeBoolean("fastMap", fastMap))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("hasPrimary", hasPrimary))
+                if (!writer.writeMessage("futVer", futVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeBoolean("hasPrimary", hasPrimary))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeBoolean("retval", retval))
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeBoolean("skipStore", skipStore))
+                if (!writer.writeBoolean("retval", retval))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("skipStore", skipStore))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeBoolean("topLocked", topLocked))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeBoolean("topLocked", topLocked))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("updateVer", updateVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 23:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
         switch (reader.state()) {
             case 3:
-                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+                clientReq = reader.readBoolean("clientReq");
 
                 if (!reader.isLastRead())
                     return false;
@@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 4:
-                conflictTtls = reader.readMessage("conflictTtls");
+                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 5:
-                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+                conflictTtls = reader.readMessage("conflictTtls");
 
                 if (!reader.isLastRead())
                     return false;
@@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 6:
-                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 7:
-                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
                     return false;
@@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 8:
-                fastMap = reader.readBoolean("fastMap");
+                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 9:
-                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+                fastMap = reader.readBoolean("fastMap");
 
                 if (!reader.isLastRead())
                     return false;
@@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 10:
-                futVer = reader.readMessage("futVer");
+                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 11:
-                hasPrimary = reader.readBoolean("hasPrimary");
+                futVer = reader.readMessage("futVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 12:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+                hasPrimary = reader.readBoolean("hasPrimary");
 
                 if (!reader.isLastRead())
                     return false;
@@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 13:
-                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 14:
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
                 byte opOrd;
 
                 opOrd = reader.readByte("op");
@@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 15:
+            case 16:
                 retval = reader.readBoolean("retval");
 
                 if (!reader.isLastRead())
@@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 16:
+            case 17:
                 skipStore = reader.readBoolean("skipStore");
 
                 if (!reader.isLastRead())
@@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 17:
+            case 18:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 18:
+            case 19:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 topLocked = reader.readBoolean("topLocked");
 
                 if (!reader.isLastRead())
@@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 updateVer = reader.readMessage("updateVer");
 
                 if (!reader.isLastRead())
@@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 24;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 05b3c7b..221b230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable TransactionIsolation isolation,
         long accessTtl
     ) {
-        assert tx == null || tx instanceof GridNearTxLocal;
+        assert tx == null || tx instanceof GridNearTxLocal : tx;
 
         GridNearTxLocal txx = (GridNearTxLocal)tx;
 
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-        GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+        GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx,
             keys,
             txx,
             isRead,
@@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @return Lock future.
      */
     IgniteInternalFuture<Exception> lockAllAsync(
-        final GridCacheContext<K, V> cacheCtx,
+        final GridCacheContext<?, ?> cacheCtx,
         @Nullable final GridNearTxLocal tx,
         final long threadId,
         final GridCacheVersion ver,
@@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @return Lock future.
      */
     private IgniteInternalFuture<Exception> lockAllAsync0(
-        GridCacheContext<K, V> cacheCtx,
+        GridCacheContext<?, ?> cacheCtx,
         @Nullable final GridNearTxLocal tx,
         long threadId,
         final GridCacheVersion ver,
@@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         int cnt = keys.size();
 
         if (tx == null) {
-            GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+            GridDhtLockFuture fut = new GridDhtLockFuture(ctx,
                 ctx.localNodeId(),
                 ver,
                 topVer,
@@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         assert nodeId != null;
         assert res != null;
 
-        GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc().
+        GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
             <Boolean>future(res.version(), res.futureId());
 
         if (fut != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 372c517..c784948 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*;
 /**
  * Colocated cache lock future.
  */
-public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
     implements GridCacheFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<K, V> cctx;
+    private GridCacheContext<?, ?> cctx;
 
     /** Lock owner thread. */
     @GridToStringInclude
@@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @param timeout Lock acquisition timeout.
      * @param accessTtl TTL for read operation.
      * @param filter Filter.
-     * @param skipStore
+     * @param skipStore Skip store flag.
      */
     public GridDhtColocatedLockFuture(
-        GridCacheContext<K, V> cctx,
+        GridCacheContext<?, ?> cctx,
         Collection<KeyCacheObject> keys,
         @Nullable GridNearTxLocal tx,
         boolean read,
@@ -326,13 +326,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * Undoes all locks.
      *
      * @param dist If {@code true}, then remove locks from remote nodes as well.
+     * @param rollback {@code True} if should rollback tx.
      */
-    private void undoLocks(boolean dist) {
+    private void undoLocks(boolean dist, boolean rollback) {
         // Transactions will undo during rollback.
         if (dist && tx == null)
             cctx.colocated().removeLocks(threadId, lockVer, keys);
         else {
-            if (tx != null) {
+            if (rollback && tx != null) {
                 if (tx.setRollbackOnly()) {
                     if (log.isDebugEnabled())
                         log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -346,16 +347,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
     }
 
     /**
-     *
-     * @param dist {@code True} if need to distribute lock release.
-     */
-    private void onFailed(boolean dist) {
-        undoLocks(dist);
-
-        complete(false);
-    }
-
-    /**
      * @param success Success flag.
      */
     public void complete(boolean success) {
@@ -475,7 +466,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                 ", fut=" + this + ']');
 
         if (!success)
-            undoLocks(distribute);
+            undoLocks(distribute, true);
 
         if (tx != null)
             cctx.tm().txContext(tx);
@@ -550,7 +541,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             // Continue mapping on the same topology version as it was before.
             this.topVer.compareAndSet(null, topVer);
 
-            map(keys);
+            map(keys, false);
 
             markInitialized();
 
@@ -558,14 +549,17 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         }
 
         // Must get topology snapshot and map on that version.
-        mapOnTopology();
+        mapOnTopology(false, null);
     }
 
     /**
      * Acquires topology future and checks it completeness under the read lock. If it is not complete,
      * will asynchronously wait for it's completeness and then try again.
+     *
+     * @param remap Remap flag.
+     * @param c Optional closure to run after map.
      */
-    private void mapOnTopology() {
+    private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {
         // We must acquire topology snapshot from the topology version future.
         cctx.topology().readLock();
 
@@ -589,19 +583,30 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                 AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                if (tx != null)
-                    tx.topologyVersion(topVer);
+                if (remap) {
+                    if (tx != null)
+                        tx.onRemap(topVer);
+
+                    this.topVer.set(topVer);
+                }
+                else {
+                    if (tx != null)
+                        tx.topologyVersion(topVer);
+
+                    this.topVer.compareAndSet(null, topVer);
+                }
 
-                this.topVer.compareAndSet(null, topVer);
+                map(keys, remap);
 
-                map(keys);
+                if (c != null)
+                    c.run();
 
                 markInitialized();
             }
             else {
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                        mapOnTopology();
+                        mapOnTopology(remap, c);
                     }
                 });
             }
@@ -617,8 +622,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * groups belonging to one primary node and locks for these groups are acquired sequentially.
      *
      * @param keys Keys.
+     * @param remap Remap flag.
      */
-    private void map(Collection<KeyCacheObject> keys) {
+    private void map(Collection<KeyCacheObject> keys, boolean remap) {
         try {
             AffinityTopologyVersion topVer = this.topVer.get();
 
@@ -633,8 +639,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                 return;
             }
 
+            boolean clientNode = cctx.kernalContext().clientNode();
+
+            assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+
             // First assume this node is primary for all keys passed in.
-            if (mapAsPrimary(keys, topVer))
+            if (!clientNode && mapAsPrimary(keys, topVer))
                 return;
 
             Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
@@ -668,6 +678,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
             boolean hasRmtNodes = false;
 
+            boolean first = true;
+
             // Create mini futures.
             for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
                 GridNearLockMapping mapping = iter.next();
@@ -736,6 +748,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                             if (cand != null && !cand.reentry()) {
                                 if (req == null) {
+                                    boolean clientFirst = false;
+
+                                    if (first) {
+                                        clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+
+                                        first = false;
+                                    }
+
                                     req = new GridNearLockRequest(
                                         cctx.cacheId(),
                                         topVer,
@@ -757,7 +777,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                                         inTx() ? tx.subjectId() : null,
                                         inTx() ? tx.taskNameHash() : 0,
                                         read ? accessTtl : -1L,
-                                        skipStore);
+                                        skipStore,
+                                        clientFirst);
 
                                     mapping.request(req);
                                 }
@@ -815,7 +836,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             if (hasRmtNodes) {
                 trackable = true;
 
-                if (!cctx.mvcc().addFuture(this))
+                if (!remap && !cctx.mvcc().addFuture(this))
                     throw new IllegalStateException("Duplicate future ID: " + this);
             }
             else
@@ -1249,75 +1270,111 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                     return;
                 }
 
-                int i = 0;
+                if (res.clientRemapVersion() != null) {
+                    assert cctx.kernalContext().clientNode();
+
+                    IgniteInternalFuture<?> affFut =
+                        cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+
+                    if (affFut != null && !affFut.isDone()) {
+                        affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                remap();
+                            }
+                        });
+                    }
+                    else
+                        remap();
+                }
+                else  {
+                    int i = 0;
 
-                for (KeyCacheObject k : keys) {
-                    IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
+                    for (KeyCacheObject k : keys) {
+                        IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
 
-                    CacheObject newVal = res.value(i);
+                        CacheObject newVal = res.value(i);
 
-                    GridCacheVersion dhtVer = res.dhtVersion(i);
+                        GridCacheVersion dhtVer = res.dhtVersion(i);
 
-                    if (newVal == null) {
-                        if (oldValTup != null) {
-                            if (oldValTup.get1().equals(dhtVer))
-                                newVal = oldValTup.get2();
+                        if (newVal == null) {
+                            if (oldValTup != null) {
+                                if (oldValTup.get1().equals(dhtVer))
+                                    newVal = oldValTup.get2();
+                            }
                         }
-                    }
 
-                    if (inTx()) {
-                        IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+                        if (inTx()) {
+                            IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+
+                            // In colocated cache we must receive responses only for detached entries.
+                            assert txEntry.cached().detached() : txEntry;
 
-                        // In colocated cache we must receive responses only for detached entries.
-                        assert txEntry.cached().detached();
+                            txEntry.markLocked();
 
-                        txEntry.markLocked();
+                            GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
 
-                        GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
+                            if (res.dhtVersion(i) == null) {
+                                onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+                                    "(will fail the lock): " + res));
+
+                                return;
+                            }
 
-                        if (res.dhtVersion(i) == null) {
-                            onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
-                                "(will fail the lock): " + res));
+                            // Set value to detached entry.
+                            entry.resetFromPrimary(newVal, dhtVer);
 
-                            return;
+                            tx.hasRemoteLocks(true);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                        }
+                        else
+                            cctx.mvcc().markExplicitOwner(k, threadId);
+
+                        if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                            cctx.events().addEvent(cctx.affinity().partition(k),
+                                k,
+                                tx,
+                                null,
+                                EVT_CACHE_OBJECT_READ,
+                                newVal,
+                                newVal != null,
+                                null,
+                                false,
+                                CU.subjectId(tx, cctx.shared()),
+                                null,
+                                tx == null ? null : tx.resolveTaskName());
                         }
 
-                        // Set value to detached entry.
-                        entry.resetFromPrimary(newVal, dhtVer);
+                        i++;
+                    }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                    try {
+                        proceedMapping(mappings);
                     }
-                    else
-                        cctx.mvcc().markExplicitOwner(k, threadId);
-
-                    if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        cctx.events().addEvent(cctx.affinity().partition(k),
-                            k,
-                            tx,
-                            null,
-                            EVT_CACHE_OBJECT_READ,
-                            newVal,
-                            newVal != null,
-                            null,
-                            false,
-                            CU.subjectId(tx, cctx.shared()),
-                            null,
-                            tx == null ? null : tx.resolveTaskName());
+                    catch (IgniteCheckedException e) {
+                        onDone(e);
                     }
 
-                    i++;
+                    onDone(true);
                 }
+            }
+        }
 
-                try {
-                    proceedMapping(mappings);
-                }
-                catch (IgniteCheckedException e) {
-                    onDone(e);
-                }
+        /**
+         *
+         */
+        private void remap() {
+            undoLocks(false, false);
 
-                onDone(true);
-            }
+            for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys)
+                cctx.mvcc().removeExplicitLock(threadId, key, lockVer);
+
+            mapOnTopology(true, new Runnable() {
+                @Override public void run() {
+                    onDone(true);
+                }
+            });
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 78966d0..1d57ef7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -80,7 +80,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     private IgniteUuid futId = IgniteUuid.randomUuid();
 
     /** Preloader. */
-    private GridDhtPreloader<K, V> preloader;
+    private GridDhtPreloader preloader;
 
     /** Trackable flag. */
     private boolean trackable;
@@ -95,7 +95,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
         GridCacheContext<K, V> cctx,
         AffinityTopologyVersion topVer,
         Collection<KeyCacheObject> keys,
-        GridDhtPreloader<K, V> preloader
+        GridDhtPreloader preloader
     ) {
         assert topVer.topologyVersion() != 0 : topVer;
         assert !F.isEmpty(keys) : keys;
@@ -208,21 +208,21 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @return {@code True} if some mapping was added.
      */
     private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) {
-        Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>();
-
-        ClusterNode loc = cctx.localNode();
-
-        int curTopVer = topCntr.get();
+        Map<ClusterNode, Set<KeyCacheObject>> mappings = null;
 
         for (KeyCacheObject key : keys)
-            map(key, mappings, exc);
+            mappings = map(key, mappings, exc);
 
         if (isDone())
             return false;
 
         boolean ret = false;
 
-        if (!mappings.isEmpty()) {
+        if (mappings != null) {
+            ClusterNode loc = cctx.localNode();
+
+            int curTopVer = topCntr.get();
+
             preloader.addFuture(this);
 
             trackable = true;
@@ -275,22 +275,27 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param key Key.
      * @param exc Exclude nodes.
      * @param mappings Mappings.
+     * @return Mappings.
      */
-    private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) {
+    private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key,
+        @Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings,
+        Collection<ClusterNode> exc)
+    {
         ClusterNode loc = cctx.localNode();
 
-        int part = cctx.affinity().partition(key);
-
         GridCacheEntryEx e = cctx.dht().peekEx(key);
 
         try {
             if (e != null && !e.isNewLocked()) {
-                if (log.isDebugEnabled())
+                if (log.isDebugEnabled()) {
+                    int part = cctx.affinity().partition(key);
+
                     log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() +
                         ", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
+                }
 
                 // Key has been rebalanced or retrieved already.
-                return;
+                return mappings;
             }
         }
         catch (GridCacheEntryRemovedException ignore) {
@@ -299,6 +304,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     ", locId=" + cctx.nodeId() + ']');
         }
 
+        int part = cctx.affinity().partition(key);
+
         List<ClusterNode> owners = F.isEmpty(exc) ? top.owners(part, topVer) :
             new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc)));
 
@@ -308,7 +315,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     "topVer=" + topVer + ", locId=" + cctx.nodeId() + ']');
 
             // Key is already rebalanced.
-            return;
+            return mappings;
         }
 
         // Create partition.
@@ -337,9 +344,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     log.debug("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" +
                         key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
 
-                return;
+                return mappings;
             }
 
+            if (mappings == null)
+                mappings = U.newHashMap(keys.size());
+
             Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet());
 
             assert mappedKeys != null;
@@ -357,6 +367,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                 log.debug("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() +
                     ", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']');
         }
+
+        return mappings;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 633f237..a6e6c4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
  * and populating local cache.
  */
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool<K, V> {
+public class GridDhtPartitionDemandPool {
     /** Dummy message to wake up a blocking queue if a node leaves. */
     private final SupplyMessage DUMMY_TOP = new SupplyMessage();
 
     /** */
-    private final GridCacheContext<K, V> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** */
     private final IgniteLogger log;
@@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
      */
-    public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+    public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
         assert cctx != null;
         assert busyLock != null;
 
@@ -108,9 +108,11 @@ public class GridDhtPartitionDemandPool<K, V> {
 
         log = cctx.logger(getClass());
 
-        poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
 
-        if (poolSize > 0) {
+        poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+        if (enabled) {
             barrier = new CyclicBarrier(poolSize);
 
             dmdWorkers = new ArrayList<>(poolSize);
@@ -327,7 +329,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
      */
-    void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) {
+    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -399,7 +401,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         private int id;
 
         /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>();
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
 
         /** Message queue. */
         private final LinkedBlockingDeque<SupplyMessage> msgQ =
@@ -425,7 +427,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         /**
          * @param assigns Assignments.
          */
-        void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) {
+        void addAssignments(GridDhtPreloaderAssignments assigns) {
             assert assigns != null;
 
             assignQ.offer(assigns);
@@ -885,7 +887,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                     }
 
                     // Sync up all demand threads at this step.
-                    GridDhtPreloaderAssignments<K, V> assigns = null;
+                    GridDhtPreloaderAssignments assigns = null;
 
                     while (assigns == null)
                         assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
@@ -995,12 +997,12 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param exchFut Exchange future.
      * @return Assignments of partitions to nodes.
      */
-    GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         // No assignments for disabled preloader.
         GridDhtPartitionTopology top = cctx.dht().topology();
 
         if (!cctx.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         int partCnt = cctx.affinity().partitions();
 
@@ -1009,7 +1011,7 @@ public class GridDhtPartitionDemandPool<K, V> {
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
                 ", topVer=" + top.topologyVersion() + ']';
 
-        GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         AffinityTopologyVersion topVer = assigns.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index facf7e3..faa6cf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -237,7 +237,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
      * @return Full string representation.
      */
     public String toFullString() {
-        return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", super.toString());
+        return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..13cfef3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Thread pool for supplying partitions to demanding nodes.
  */
-class GridDhtPartitionSupplyPool<K, V> {
+class GridDhtPartitionSupplyPool {
     /** */
-    private final GridCacheContext<K, V> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** */
     private final IgniteLogger log;
@@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> {
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
      */
-    GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+    GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
         assert cctx != null;
         assert busyLock != null;
 
@@ -83,16 +83,18 @@ class GridDhtPartitionSupplyPool<K, V> {
 
         top = cctx.dht().topology();
 
-        int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+        if (!cctx.kernalContext().clientNode()) {
+            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
 
-        for (int i = 0; i < poolSize; i++)
-            workers.add(new SupplyWorker());
+            for (int i = 0; i < poolSize; i++)
+                workers.add(new SupplyWorker());
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
-            @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                processDemandMessage(id, m);
-            }
-        });
+            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                    processDemandMessage(id, m);
+                }
+            });
+        }
 
         depEnabled = cctx.gridDeploy().enabled();
     }
@@ -248,11 +250,6 @@ class GridDhtPartitionSupplyPool<K, V> {
             boolean ack = false;
 
             try {
-                // Partition map exchange is finished which means that all near transactions with given
-                // topology version are committed. We can wait for local locks here as it will not take
-                // much time.
-                cctx.mvcc().finishLocks(d.topologyVersion()).get();
-
                 for (int part : d.partitions()) {
                     GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
 


Mime
View raw message