ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-4705
Date Thu, 02 Mar 2017 21:08:21 GMT
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c4394f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c4394f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c4394f0

Branch: refs/heads/ignite-4705-2
Commit: 6c4394f098595ff2fd01b18d204c9acde39cbf15
Parents: 30bfae0
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Mar 2 22:06:31 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Mar 3 00:08:05 2017 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   2 -
 .../processors/cache/GridCacheIoManager.java    |   2 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  22 +--
 .../GridDhtAtomicAbstractUpdateRequest.java     |  38 ++---
 .../dht/atomic/GridDhtAtomicCache.java          |  36 +++--
 .../dht/atomic/GridDhtAtomicNearResponse.java   |  18 +--
 .../GridDhtAtomicSingleUpdateRequest.java       |  18 +--
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  62 ++++----
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |   4 +-
 .../GridNearAtomicAbstractUpdateFuture.java     | 152 +++++++-----------
 .../GridNearAtomicAbstractUpdateRequest.java    | 155 +++++++++++++++++++
 .../atomic/GridNearAtomicFullUpdateRequest.java | 155 ++++++++++++++++++-
 ...GridNearAtomicSingleUpdateFilterRequest.java |   6 +-
 .../GridNearAtomicSingleUpdateFuture.java       |  63 ++++++--
 ...GridNearAtomicSingleUpdateInvokeRequest.java |  10 +-
 .../GridNearAtomicSingleUpdateRequest.java      |  10 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 110 ++++++++++---
 .../atomic/GridNearAtomicUpdateResponse.java    |  38 ++---
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 103 ++++++++++++
 19 files changed, 740 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 537d495..a1cbe72 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -196,7 +195,6 @@ public class MessageCodeGenerator {
 //        gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
 //        gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
 //        gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
-//        gen.generateAndWrite(GridNearAtomicMappingResponse.class);
         //gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
 
 //        gen.generateAndWrite(GridMessageCollection.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index d1a6753..0d67e3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -898,7 +898,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     @SuppressWarnings("unchecked")
     public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
-        assert !node.isLocal();
+        assert !node.isLocal() : node;
 
         if (!onSend(msg, node.id()))
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 96f3611..39059ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -305,7 +305,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /** {@inheritDoc} */
     @Override public final boolean onNodeLeft(UUID nodeId) {
-        boolean res = registerResponse(nodeId, true, null);
+        boolean res = registerResponse(nodeId);
 
         if (res && msgLog.isDebugEnabled()) {
             msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer +
@@ -317,11 +317,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /**
      * @param nodeId Node ID.
-     * @param nodeErr Node error flag.
-     * @param errors Errors instance if DHT node failed to unmarshal message.
      * @return {@code True} if request found.
      */
-    private boolean registerResponse(UUID nodeId, boolean nodeErr, UpdateErrors errors) {
+    private boolean registerResponse(UUID nodeId) {
         int resCnt0;
 
         GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
@@ -457,23 +455,25 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                         ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
                 }
 
-                registerResponse(req.nodeId(), true, null);
+                registerResponse(req.nodeId());
             }
             catch (IgniteCheckedException ignored) {
                 U.error(msgLog, "Failed to send request [futId=" + futId +
                     ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
 
-                registerResponse(req.nodeId(), true, null);
+                registerResponse(req.nodeId());
             }
         }
     }
 
     /**
      * @param nodeId Node ID.
-     * @param errors Response.
+     * @param res Response.
      */
-    public final void onDhtErrorResponse(UUID nodeId, UpdateErrors errors) {
-        registerResponse(nodeId, false, errors);
+    public final void onDhtResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
+        assert !updateReq.dhtReplyToNear();
+
+        registerResponse(nodeId);
     }
 
     /**
@@ -481,11 +481,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      *
      * @param nodeId Backup node ID.
      */
-    public final void onResult(UUID nodeId) {
+    public final void onDeferredResponse(UUID nodeId) {
         if (log.isDebugEnabled())
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
-        registerResponse(nodeId, false, null);
+        registerResponse(nodeId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index e04e381..e86c3a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -430,7 +430,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 12;
     }
 
     /** {@inheritDoc} */
@@ -448,55 +448,55 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         }
 
         switch (writer.state()) {
-            case 4:
+            case 3:
                 if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
-            case 5:
+            case 4:
                 if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 5:
                 if (!writer.writeLong("nearFutId", nearFutId))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 9:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 10:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
-            case 12:
+            case 11:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -518,7 +518,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
             return false;
 
         switch (reader.state()) {
-            case 4:
+            case 3:
                 flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
@@ -526,7 +526,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
-            case 5:
+            case 4:
                 futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
@@ -534,7 +534,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
-            case 6:
+            case 5:
                 nearFutId = reader.readLong("nearFutId");
 
                 if (!reader.isLastRead())
@@ -542,7 +542,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
@@ -550,7 +550,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -558,7 +558,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -570,7 +570,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
-            case 10:
+            case 9:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -578,7 +578,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
-            case 11:
+            case 10:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -586,7 +586,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
-            case 12:
+            case 11:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f5d06dd..ca78186 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3059,7 +3059,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         if (fut != null)
             fut.onPrimaryResponse(nodeId, res, false);
-
         else
             U.warn(msgLog, "Failed to find near update future for update response (will ignore) " +
                 "[futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']');
@@ -3397,7 +3396,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void sendDhtNearResponse(final GridDhtAtomicAbstractUpdateRequest req, GridDhtAtomicNearResponse nearRes) {
         try {
-            ctx.gridIO().send(req.nearNodeId(), TOPIC_CACHE, nearRes, ctx.ioPolicy());
+            ClusterNode node = ctx.discovery().node(req.nearNodeId());
+
+            if (node == null)
+                throw new ClusterTopologyCheckedException("Node failed: " + req.nearNodeId());
+
+            if (node.isLocal())
+                processDhtAtomicNearResponse(node.id(), nearRes);
+            else
+                ctx.io().send(node, nearRes, ctx.ioPolicy());
 
             if (msgLog.isDebugEnabled()) {
                 msgLog.debug("Sent DHT near response [futId=" + req.futureId() +
@@ -3452,8 +3459,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
-        assert !F.isEmpty(res.nearEvicted()) || res.error() != null : res;
-
         if (!F.isEmpty(res.nearEvicted())) {
             for (KeyCacheObject key : res.nearEvicted()) {
                 try {
@@ -3469,22 +3474,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
 
-        if (res.errors() != null) {
-            GridDhtAtomicAbstractUpdateFuture updateFut =
+
+        GridDhtAtomicAbstractUpdateFuture updateFut =
                 (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
-            if (updateFut != null) {
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Received DHT atomic update response [futId=" + res.futureId() +
+        if (updateFut != null) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Received DHT atomic update response [futId=" + res.futureId() +
                         ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']');
-                }
-
-                updateFut.onDhtErrorResponse(nodeId, res.errors());
             }
-            else {
-                U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() +
+
+            updateFut.onDhtResponse(nodeId, res);
+        }
+        else {
+            U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() +
                     ", node=" + nodeId + ", res=" + res + ']');
-            }
         }
     }
 
@@ -3509,7 +3513,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         ", writeVer=" + res + ", node=" + nodeId + ']');
                 }
 
-                updateFut.onResult(nodeId);
+                updateFut.onDeferredResponse(nodeId);
             }
             else {
                 U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + id +

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index 595e41a..b2cc6e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -178,7 +178,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 8;
     }
 
     /** {@inheritDoc} */
@@ -223,25 +223,25 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 writer.incrementState();
 
-            case 5:
+            case 4:
                 if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 5:
                 if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 6:
                 if (!writer.writeInt("partId", partId))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 7:
                 if (!writer.writeUuid("primaryId", primaryId))
                     return false;
 
@@ -271,7 +271,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 5:
+            case 4:
                 flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
@@ -279,7 +279,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 6:
+            case 5:
                 futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
@@ -287,7 +287,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 8:
+            case 6:
                 partId = reader.readInt("partId");
 
                 if (!reader.isLastRead())
@@ -295,7 +295,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 9:
+            case 7:
                 primaryId = reader.readUuid("primaryId");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 3d3ce04..10dc77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -345,25 +345,25 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         }
 
         switch (writer.state()) {
-            case 13:
+            case 12:
                 if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 13:
                 if (!writer.writeMessage("prevVal", prevVal))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 14:
                 if (!writer.writeLong("updateCntr", updateCntr))
                     return false;
 
                 writer.incrementState();
 
-            case 16:
+            case 15:
                 if (!writer.writeMessage("val", val))
                     return false;
 
@@ -385,7 +385,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
             return false;
 
         switch (reader.state()) {
-            case 13:
+            case 12:
                 key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
@@ -393,7 +393,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 14:
+            case 13:
                 prevVal = reader.readMessage("prevVal");
 
                 if (!reader.isLastRead())
@@ -401,7 +401,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 15:
+            case 14:
                 updateCntr = reader.readLong("updateCntr");
 
                 if (!reader.isLastRead())
@@ -409,7 +409,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 16:
+            case 15:
                 val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
@@ -459,7 +459,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 17;
+        return 16;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 029ea42..78368fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -532,91 +532,91 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
         }
 
         switch (writer.state()) {
-            case 13:
+            case 12:
                 if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 13:
                 if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 14:
                 if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
-            case 16:
+            case 15:
                 if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
                     return false;
 
                 writer.incrementState();
 
-            case 17:
+            case 16:
                 if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
-            case 18:
+            case 17:
                 if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 19:
+            case 18:
                 if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
-            case 20:
+            case 19:
                 if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
                     return false;
 
                 writer.incrementState();
 
-            case 21:
+            case 20:
                 if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 22:
+            case 21:
                 if (!writer.writeMessage("nearTtls", nearTtls))
                     return false;
 
                 writer.incrementState();
 
-            case 23:
+            case 22:
                 if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 24:
+            case 23:
                 if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 25:
+            case 24:
                 if (!writer.writeMessage("ttls", ttls))
                     return false;
 
                 writer.incrementState();
 
-            case 26:
+            case 25:
                 if (!writer.writeMessage("updateCntrs", updateCntrs))
                     return false;
 
                 writer.incrementState();
 
-            case 27:
+            case 26:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -638,7 +638,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
             return false;
 
         switch (reader.state()) {
-            case 13:
+            case 12:
                 conflictExpireTimes = reader.readMessage("conflictExpireTimes");
 
                 if (!reader.isLastRead())
@@ -646,7 +646,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 14:
+            case 13:
                 conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -654,7 +654,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 15:
+            case 14:
                 entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
@@ -662,7 +662,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 16:
+            case 15:
                 forceTransformBackups = reader.readBoolean("forceTransformBackups");
 
                 if (!reader.isLastRead())
@@ -670,7 +670,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 17:
+            case 16:
                 invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
@@ -678,7 +678,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 18:
+            case 17:
                 keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -686,7 +686,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 19:
+            case 18:
                 nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
@@ -694,7 +694,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 20:
+            case 19:
                 nearExpireTimes = reader.readMessage("nearExpireTimes");
 
                 if (!reader.isLastRead())
@@ -702,7 +702,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 21:
+            case 20:
                 nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -710,7 +710,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 22:
+            case 21:
                 nearTtls = reader.readMessage("nearTtls");
 
                 if (!reader.isLastRead())
@@ -718,7 +718,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 23:
+            case 22:
                 nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -726,7 +726,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 24:
+            case 23:
                 prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -734,7 +734,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 25:
+            case 24:
                 ttls = reader.readMessage("ttls");
 
                 if (!reader.isLastRead())
@@ -742,7 +742,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 26:
+            case 25:
                 updateCntrs = reader.readMessage("updateCntrs");
 
                 if (!reader.isLastRead())
@@ -750,7 +750,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 27:
+            case 26:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -776,7 +776,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 28;
+        return 27;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 10806b1..0134f64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -211,7 +211,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeMessage("errors", errs))
+                if (!writer.writeMessage("errs", errs))
                     return false;
 
                 writer.incrementState();
@@ -251,7 +251,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
         switch (reader.state()) {
             case 3:
-                errs = reader.readMessage("errors");
+                errs = reader.readMessage("errs");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 6228f03..e174bd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -325,11 +326,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         final GridNearAtomicAbstractUpdateRequest req;
 
         /** */
-        private Set<UUID> rcvd;
-
-        /** */
         @GridToStringInclude
-        private Set<UUID> mapping;
+        private Set<UUID> dhtNodes;
 
         /** */
         private boolean hasRes;
@@ -337,67 +335,66 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         /**
          * @param req Request.
          */
-        PrimaryRequestState(GridNearAtomicAbstractUpdateRequest req) {
+        PrimaryRequestState(GridNearAtomicAbstractUpdateRequest req, List<ClusterNode> nodes, boolean single) {
             assert req != null && req.nodeId() != null : req;
 
             this.req = req;
+
+            if (req.dhtReplyToNear()) {
+                if (single) {
+                    if (nodes.size() > 1) {
+                        dhtNodes = U.newHashSet(nodes.size() - 1);
+
+                        for (int i = 1; i < nodes.size(); i++)
+                            dhtNodes.add(nodes.get(i).id());
+                    }
+                    else
+                        dhtNodes = Collections.emptySet();
+                }
+                else {
+                    dhtNodes = new HashSet<>();
+
+                    for (int i = 1; i < nodes.size(); i++)
+                        dhtNodes.add(nodes.get(i).id());
+                }
+            }
         }
 
-        /**
-         * @return {@code True} if all expected responses are received.
-         */
-        private boolean finished() {
-            return mapping != null && mapping.isEmpty() && hasRes;
+        void addMapping(List<ClusterNode> nodes) {
+            assert req.dhtReplyToNear();
+
+            for (int i = 1; i < nodes.size(); i++)
+                dhtNodes.add(nodes.get(i).id());
         }
 
-        void initAffinityMapping(GridCacheContext cctx, UUID skipNodeId) {
-            assert req.size() == 1 : req;
+        boolean checkDhtNodes(GridCacheContext cctx) {
+            if (finished())
+                return false;
 
-            List<ClusterNode> nodes =
-                cctx.affinity().nodesByPartition(req.key(0).partition(), req.topologyVersion());
+            boolean finished = false;
 
-            if (nodes.size() == 1)
-                mapping = Collections.emptySet();
-            else {
-                for (int i = 1; i < nodes.size(); i++) {
-                    ClusterNode dhtNode = nodes.get(i);
+            for (Iterator<UUID> it = dhtNodes.iterator(); it.hasNext();) {
+                UUID nodeId = it.next();
 
-                    if (dhtNode.id().equals(skipNodeId) || (rcvd != null && rcvd.contains(dhtNode.id())))
-                        continue;
+                if (!cctx.discovery().alive(nodeId)) {
+                    it.remove();
 
-                    if (cctx.discovery().node(dhtNode.id()) != null) {
-                        if (mapping == null)
-                            mapping = U.newHashSet(nodes.size() - 1);
+                    if (finished()) {
+                        finished = true;
 
-                        mapping.add(dhtNode.id());
+                        break;
                     }
                 }
-
-                if (mapping == null)
-                    mapping = Collections.emptySet();
             }
+
+            return finished;
         }
 
         /**
-         * @param cctx Context.
-         * @param nodeIds DHT nodes.
-         * @param skipNodeId Node ID to skip.
+         * @return {@code True} if all expected responses are received.
          */
-        void initMapping(GridCacheContext cctx, List<UUID> nodeIds, @Nullable UUID skipNodeId) {
-            for (UUID dhtNodeId : nodeIds) {
-                if (dhtNodeId.equals(skipNodeId) || (rcvd != null && rcvd.contains(dhtNodeId)))
-                    continue;
-
-                if (cctx.discovery().node(dhtNodeId) != null) {
-                    if (mapping == null)
-                        mapping = U.newHashSet(nodeIds.size());
-
-                    mapping.add(dhtNodeId);
-                }
-            }
-
-            if (mapping == null)
-                mapping = Collections.emptySet();
+        private boolean finished() {
+            return req.dhtReplyToNear() ? (dhtNodes.isEmpty() && hasRes) : hasRes;
         }
 
         /**
@@ -419,72 +416,36 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if request processing finished.
          */
         boolean onNodeLeft(UUID nodeId) {
-            if (finished())
+            if (!req.dhtReplyToNear() || finished())
                 return false;
 
-            if (mapping != null && mapping.remove(nodeId))
-                return finished();
-
-            return false;
+            return dhtNodes.remove(nodeId) && finished();
         }
 
         /**
          * TODO 4705: check response for errors.
          *
-         * @param cctx Context.
          * @param nodeId Node ID.
          * @param res Response.
          * @return {@code True} if request processing finished.
          */
-        boolean onDhtResponse(GridCacheContext cctx, UUID nodeId, GridDhtAtomicNearResponse res) {
+        boolean onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+            assert req.dhtReplyToNear();
+
             if (finished())
                 return false;
 
-//            if (res.primaryDhtFailureResponse()) {
-//                assert res.mapping() != null : res;
-//                assert res.failedNodeId() != null : res;
-//
-//                nodeId = res.failedNodeId();
-//            }
-//
-//            if (res.hasResult())
-//                hasRes = true;
-//
-//            if (res.affinityMapping()) {
-//                if (mapping == null) {
-//                    initAffinityMapping(cctx, nodeId);
-//
-//                    return finished();
-//                }
-//            } else if (res.mapping() != null) {
-//                // Mapping is sent from dht nodes.
-//                if (mapping == null) {
-//                    initMapping(cctx, res.mapping(), nodeId);
-//
-//                    return finished();
-//                }
-//            }
-//            else {
-//                // Mapping and result are sent from primary.
-//                if (mapping == null) {
-//                    if (rcvd == null)
-//                        rcvd = new HashSet<>();
-//
-//                    rcvd.add(nodeId);
-//
-//                    return false; // Need wait for response from primary.
-//                }
-//            }
-
-            return mapping.remove(nodeId) && finished();
+            if (res.hasResult())
+                hasRes = true;
+
+            return dhtNodes != null && dhtNodes.remove(nodeId) && finished();
         }
 
         /**
-         * @param cctx Context.
          * @param res Response.
          * @return {@code True} if request processing finished.
          */
-        boolean onPrimaryResponse(GridCacheContext cctx, GridNearAtomicUpdateResponse res) {
+        boolean onPrimaryResponse(GridNearAtomicUpdateResponse res) {
             assert !finished() : this;
 
             hasRes = true;
@@ -498,13 +459,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
             assert res.returnValue() != null : res;
 
-//            if (res.mapping() != null) {
-//                if (mapping == null)
-//                    initMapping(cctx, res.mapping(), null);
-//            }
-//            else
-//                initAffinityMapping(cctx, null);
-
             return finished();
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a40cfe0..64839d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
@@ -33,6 +34,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -154,6 +157,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /** {@inheritDoc} */
+    @Override public final AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
     @Override public final int lookupIndex() {
         return CACHE_MSG_IDX;
     }
@@ -401,4 +409,151 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @return Key.
      */
     public abstract KeyCacheObject key(int idx);
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                byte opOrd;
+
+                opOrd = reader.readByte("op");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                op = GridCacheOperation.fromOrdinal(opOrd);
+
+                reader.incrementState();
+
+            case 6:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 8:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 331bcdd..81b89e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
@@ -432,11 +433,163 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 10:
+                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("conflictTtls", conflictTtls))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
+                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
+                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 16:
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 17:
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 18:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
         return true;
     }
 
     /** {@inheritDoc} */
     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 10:
+                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                conflictTtls = reader.readMessage("conflictTtls");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
+                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 16:
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 17:
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
+                vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
         return reader.afterMessageRead(GridNearAtomicFullUpdateRequest.class);
     }
 
@@ -459,7 +612,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 27;
+        return 19;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index b9ded3d..b078b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -166,7 +166,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
         }
 
         switch (writer.state()) {
-            case 14:
+            case 12:
                 if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
@@ -188,7 +188,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
             return false;
 
         switch (reader.state()) {
-            case 14:
+            case 12:
                 filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
@@ -208,7 +208,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 13;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index a741735..b4f88aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
@@ -43,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -229,7 +231,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (opRes == null && res.hasResult())
                 opRes = res.result();
 
-            if (reqState.onDhtResponse(cctx, nodeId, res)) {
+            if (reqState.onDhtResponse(nodeId, res)) {
                 opRes0 = opRes;
                 err0 = err;
                 remapTopVer0 = onAllReceived();
@@ -315,7 +317,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 assert reqState != null;
 
-                if (!reqState.onPrimaryResponse(cctx, res))
+                if (!reqState.onPrimaryResponse(res))
                     return;
             }
 
@@ -492,10 +494,10 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         Long futId = cctx.mvcc().atomicFutureId();
 
         Exception err = null;
-        GridNearAtomicAbstractUpdateRequest singleReq0 = null;
+        PrimaryRequestState reqState0 = null;
 
         try {
-            singleReq0 = mapSingleUpdate(topVer, futId);
+            reqState0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
                 assert this.futId == null : this;
@@ -504,7 +506,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 this.topVer = topVer;
                 this.futId = futId;
 
-                reqState = new PrimaryRequestState(singleReq0);
+                reqState = reqState0;
             }
 
             if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
@@ -512,6 +514,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 return;
             }
+
+            if (reqState.req.dhtReplyToNear() && (cctx.discovery().topologyVersion() != topVer.topologyVersion())) {
+                if (!checkDhtNodes(futId))
+                    return;
+            }
         }
         catch (Exception e) {
             err = e;
@@ -524,7 +531,36 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         }
 
         // Optimize mapping for single key.
-        mapSingle(singleReq0.nodeId(), singleReq0);
+        mapSingle(reqState0.req.nodeId(), reqState0.req);
+    }
+
+    /**
+     * @param futId
+     * @return
+     */
+    private boolean checkDhtNodes(Long futId) {
+        GridCacheReturn opRes0;
+        CachePartialUpdateCheckedException err0;
+        AffinityTopologyVersion remapTopVer0;
+
+        synchronized (mux) {
+            if (this.futId == null || !this.futId.equals(futId))
+                return false;
+
+            assert reqState != null;
+
+            if (reqState.checkDhtNodes(cctx)) {
+                opRes0 = opRes;
+                err0 = err;
+                remapTopVer0 = onAllReceived();
+            }
+            else
+                return true;
+        }
+
+        finishUpdateFuture(opRes0, err0, remapTopVer0);
+
+        return false;
     }
 
     /**
@@ -548,7 +584,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, long futId)
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long futId)
         throws Exception {
         if (key == null)
             throw new NullPointerException("Null key.");
@@ -565,15 +601,18 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         else
             val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
 
-        ClusterNode primary = cctx.affinity().primaryByKey(cacheKey, topVer);
+        boolean stableTop = cctx.topology().rebalanceFinished(topVer) &&
+            !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
 
-        if (primary == null)
+        List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
+
+        if (F.isEmpty(nodes))
             throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                 "left the grid).");
 
-        GridNearAtomicAbstractUpdateRequest req;
+        ClusterNode primary = nodes.get(0);
 
-        boolean stableTop = true;
+        GridNearAtomicAbstractUpdateRequest req;
 
         if (canUseSingleRequest()) {
             if (op == TRANSFORM) {
@@ -660,7 +699,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             CU.EXPIRE_TIME_CALCULATE,
             null);
 
-        return req;
+        return new PrimaryRequestState(req, nodes, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 5199602..c7c92dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -234,13 +234,13 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         }
 
         switch (writer.state()) {
-            case 14:
+            case 12:
                 if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 13:
                 if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
@@ -262,7 +262,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
             return false;
 
         switch (reader.state()) {
-            case 14:
+            case 12:
                 entryProcessorBytes = reader.readByteArray("entryProcessorBytes");
 
                 if (!reader.isLastRead())
@@ -270,7 +270,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
 
                 reader.incrementState();
 
-            case 15:
+            case 13:
                 invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
@@ -285,7 +285,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 16;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 42367b8..eff5e31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -257,13 +257,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         }
 
         switch (writer.state()) {
-            case 11:
+            case 10:
                 if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
-            case 13:
+            case 11:
                 if (!writer.writeMessage("val", val))
                     return false;
 
@@ -285,7 +285,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
             return false;
 
         switch (reader.state()) {
-            case 11:
+            case 10:
                 key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
@@ -293,7 +293,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
                 reader.incrementState();
 
-            case 13:
+            case 11:
                 val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
@@ -321,7 +321,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 14;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 629c447..16715a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
@@ -55,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
@@ -300,7 +302,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 if (opRes == null && res.hasResult())
                     opRes = res.result();
 
-                if (singleReq.onDhtResponse(cctx, nodeId, res)) {
+                if (singleReq.onDhtResponse(nodeId, res)) {
                     opRes0 = opRes;
                     err0 = err;
                     remapTopVer0 = onAllReceived();
@@ -315,7 +317,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     if (opRes == null && res.hasResult())
                         opRes = res.result();
 
-                    if (reqState.onDhtResponse(cctx, nodeId, res)) {
+                    if (reqState.onDhtResponse(nodeId, res)) {
                         assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
 
                         resCnt++;
@@ -371,7 +373,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 if (req == null)
                     return;
 
-                rcvAll = singleReq.onPrimaryResponse(cctx, res);
+                rcvAll = singleReq.onPrimaryResponse(res);
             }
             else {
                 if (mappings == null)
@@ -385,7 +387,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 req = reqState.processPrimaryResponse(nodeId);
 
                 if (req != null) {
-                    if (reqState.onPrimaryResponse(cctx, res)) {
+                    if (reqState.onPrimaryResponse(res)) {
                         assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
 
                         resCnt++;
@@ -766,16 +768,20 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         int size = keys.size();
 
         try {
+            boolean stableTop = cctx.topology().rebalanceFinished(topVer) &&
+                !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
+
             if (size == 1) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
-                singleReq0 = mapSingleUpdate(topVer, futId);
+                singleReq0 = mapSingleUpdate(topVer, futId, stableTop);
             }
             else {
                 Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
                     topVer,
                     futId,
-                    remapKeys);
+                    remapKeys,
+                    stableTop);
 
                 if (pendingMappings.size() == 1)
                     singleReq0 = F.firstValue(pendingMappings);
@@ -806,6 +812,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                 return;
             }
+
+            if (stableTop && syncMode == FULL_SYNC && cctx.discovery().topologyVersion() != topVer.topologyVersion()) {
+                if (!checkDhtNodes(futId))
+                    return;
+            }
         }
         catch (Exception e) {
             err = e;
@@ -830,6 +841,59 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         }
     }
 
+    private boolean checkDhtNodes(Long futId) {
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
+        AffinityTopologyVersion remapTopVer0 = null;
+
+        synchronized (mux) {
+            if (this.futId == null || !this.futId.equals(futId))
+                return false;
+
+            if (singleReq != null) {
+                if (singleReq.checkDhtNodes(cctx)) {
+                    opRes0 = opRes;
+                    err0 = err;
+                    remapTopVer0 = onAllReceived();
+                }
+                else
+                    return true;
+            }
+            else {
+                if (mappings != null) {
+                    boolean rcvAll = false;
+
+                    for (PrimaryRequestState reqState : mappings.values()) {
+                        if (reqState.checkDhtNodes(cctx)) {
+                            assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                            resCnt++;
+
+                            if (mappings.size() == resCnt) {
+                                rcvAll = true;
+
+                                opRes0 = opRes;
+                                err0 = err;
+                                remapTopVer0 = onAllReceived();
+
+                                break;
+                            }
+                        }
+                    }
+
+                    if (!rcvAll)
+                        return true;
+                }
+                else
+                    return true;
+            }
+        }
+
+        finishUpdateFuture(opRes0, err0, remapTopVer0);
+
+        return false;
+    }
+
     /**
      * @return Future version.
      */
@@ -857,7 +921,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
         Long futId,
-        @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+        @Nullable Collection<KeyCacheObject> remapKeys,
+        boolean stableTop) throws Exception {
         Iterator<?> it = null;
 
         if (vals != null)
@@ -875,8 +940,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
 
-        boolean stableTop = true;
-
         // Create mappings first, then send messages.
         for (Object key : keys) {
             if (key == null)
@@ -930,18 +993,20 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             else
                 val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
 
-            ClusterNode primary = cctx.affinity().primaryByKey(cacheKey, topVer);
+            List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
 
-            if (primary == null)
+            if (F.isEmpty(nodes))
                 throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                     "(all partition nodes left the grid).");
 
+            ClusterNode primary = nodes.get(0);
+
             UUID nodeId = primary.id();
 
             PrimaryRequestState mapped = pendingMappings.get(nodeId);
 
             if (mapped == null) {
-                mapped = new PrimaryRequestState(new GridNearAtomicFullUpdateRequest(
+                GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
                     cctx.cacheId(),
                     nodeId,
                     futId,
@@ -959,11 +1024,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     skipStore,
                     keepBinary,
                     cctx.deploymentEnabled(),
-                    keys.size()));
+                    keys.size());
+
+                mapped = new PrimaryRequestState(req, nodes, false);
 
                 pendingMappings.put(nodeId, mapped);
             }
 
+            if (mapped.req.dhtReplyToNear())
+                mapped.addMapping(nodes);
+
             mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
         }
 
@@ -976,7 +1046,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Request.
      * @throws Exception If failed.
      */
-    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId) throws Exception {
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean stableTop) throws Exception {
         Object key = F.first(keys);
 
         Object val;
@@ -1029,13 +1099,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         else
             val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
 
-        ClusterNode primary = cctx.affinity().primaryByPartition(cacheKey.partition(), topVer);
+        List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
 
-        if (primary == null)
-            throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
-                "left the grid).");
+        if (F.isEmpty(nodes))
+            throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                    "(all partition nodes left the grid).");
 
-        boolean stableTop = true;
+        ClusterNode primary = nodes.get(0);
 
         GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
             cctx.cacheId(),
@@ -1063,7 +1133,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             conflictExpireTime,
             conflictVer);
 
-        return new PrimaryRequestState(req);
+        return new PrimaryRequestState(req, nodes, true);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c4394f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 6bb8913..c2600a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -481,55 +481,55 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeMessage("nearTtls", nearTtls))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 9:
                 if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 10:
                 if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
-            case 12:
+            case 11:
                 if (!writer.writeMessage("nearVer", nearVer))
                     return false;
 
                 writer.incrementState();
 
-            case 13:
+            case 12:
                 if (!writer.writeInt("partId", partId))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 13:
                 if (!writer.writeMessage("remapTopVer", remapTopVer))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 14:
                 if (!writer.writeMessage("ret", ret))
                     return false;
 
@@ -575,7 +575,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 nearExpireTimes = reader.readMessage("nearExpireTimes");
 
                 if (!reader.isLastRead())
@@ -583,7 +583,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
@@ -591,7 +591,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 nearTtls = reader.readMessage("nearTtls");
 
                 if (!reader.isLastRead())
@@ -599,7 +599,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 10:
+            case 9:
                 nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -607,7 +607,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 11:
+            case 10:
                 nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
@@ -615,7 +615,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 12:
+            case 11:
                 nearVer = reader.readMessage("nearVer");
 
                 if (!reader.isLastRead())
@@ -623,7 +623,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 13:
+            case 12:
                 partId = reader.readInt("partId");
 
                 if (!reader.isLastRead())
@@ -631,7 +631,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 14:
+            case 13:
                 remapTopVer = reader.readMessage("remapTopVer");
 
                 if (!reader.isLastRead())
@@ -639,7 +639,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 15:
+            case 14:
                 ret = reader.readMessage("ret");
 
                 if (!reader.isLastRead())
@@ -659,7 +659,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 16;
+        return 15;
     }
 
     /** {@inheritDoc} */


Mime
View raw message