ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-4705
Date Fri, 03 Mar 2017 14:07:02 GMT
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ab159de
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ab159de
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ab159de

Branch: refs/heads/ignite-4705-2
Commit: 4ab159def9fb73eb9082d5954336f95c5cb5fdf7
Parents: 6c4394f
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Mar 3 10:54:34 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Mar 3 17:06:53 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheReturn.java       |   6 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      | 142 +++++++---------
 .../GridDhtAtomicAbstractUpdateRequest.java     |  11 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 102 ++++++------
 .../dht/atomic/GridDhtAtomicNearResponse.java   |   6 +-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  13 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  12 +-
 ...idNearAtomicAbstractSingleUpdateRequest.java |   4 +-
 .../GridNearAtomicAbstractUpdateFuture.java     | 143 ++++++++++++++--
 .../GridNearAtomicAbstractUpdateRequest.java    |  28 ++--
 .../GridNearAtomicCheckUpdateRequest.java       | 165 +++++++++++++++++++
 .../atomic/GridNearAtomicFullUpdateRequest.java |   4 +-
 ...GridNearAtomicSingleUpdateFilterRequest.java |   4 +-
 .../GridNearAtomicSingleUpdateFuture.java       |  84 +++++-----
 ...GridNearAtomicSingleUpdateInvokeRequest.java |   7 +-
 .../GridNearAtomicSingleUpdateRequest.java      |   4 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 126 ++++++--------
 .../atomic/GridNearAtomicUpdateResponse.java    | 100 ++++-------
 .../distributed/dht/atomic/UpdateErrors.java    |  14 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   |  59 ++++++-
 21 files changed, 670 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index fa59291..0548581 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
@@ -175,6 +176,11 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -47:
+                msg = new GridNearAtomicCheckUpdateRequest();
+
+                break;
+
             case -46:
                 msg = new UpdateErrors();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index 80f43fa..c5d4066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -126,12 +126,10 @@ public class GridCacheReturn implements Externalizable, Message {
     }
 
     /**
-     * Checks if value is not {@code null}.
      *
-     * @return {@code True} if value is not {@code null}.
      */
-    public boolean hasValue() {
-        return invokeRes || v != null || cacheObj != null;
+    public boolean emptyResult() {
+        return !invokeRes && v  == null && cacheObj == null && success;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 39059ff..5e01726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -50,7 +52,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  * DHT atomic cache backup update future.
@@ -126,6 +128,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         }
     }
 
+    protected abstract boolean allUpdated();
+
     /** {@inheritDoc} */
     @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
         boolean waitForExchange = !updateReq.topologyLocked();
@@ -324,8 +328,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
         GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
 
-//        boolean needReplyToNear = false;
-
         if (req != null) {
             synchronized (this) {
                 if (req.onResponse()) {
@@ -338,50 +340,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 else
                     return false;
             }
-//
-//            if (needReplyToNear) {
-//                assert !F.isEmpty(mappings);
-//
-//                List<UUID> dhtNodes = new ArrayList<>(mappings.size());
-//
-//                dhtNodes.addAll(mappings.keySet());
-//
-//                GridDhtAtomicNearResponse res = new GridDhtAtomicNearResponse(cctx.cacheId(),
-//                    req.partition(),
-//                    req.nearFutureId(),
-//                    cctx.localNodeId(),
-//                    req.flags());
-//
-//                res.errors(errors);
-//
-//                try {
-//                    cctx.io().send(req.nearNodeId(), res, cctx.ioPolicy());
-//
-//                    if (msgLog.isDebugEnabled()) {
-//                        msgLog.debug("DTH update fut, sent response on DHT node fail " +
-//                            "[futId=" + futId +
-//                            ", writeVer=" + writeVer +
-//                            ", node=" + req.nearNodeId() +
-//                            ", failedNode=" + nodeId + ']');
-//                    }
-//                }
-//                catch (ClusterTopologyCheckedException ignored) {
-//                    if (msgLog.isDebugEnabled()) {
-//                        msgLog.debug("DTH update fut, failed to notify near node on DHT node fail, near node left " +
-//                            "[futId=" + futId +
-//                            ", writeVer=" + writeVer +
-//                            ", node=" + req.nearNodeId() +
-//                            ", failedNode=" + nodeId + ']');
-//                    }
-//                }
-//                catch (IgniteCheckedException ignored) {
-//                    U.error(msgLog, "DTH update fut, failed to notify near node on DHT node fail " +
-//                        "[futId=" + futId +
-//                        ", writeVer=" + writeVer +
-//                        ", node=" + req.nearNodeId() +
-//                        ", failedNode=" + nodeId + ']');
-//                }
-//            }
 
             if (resCnt0 == mappings.size())
                 onDone();
@@ -399,6 +357,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      */
     final void map(ClusterNode nearNode, GridCacheReturn ret) {
         if (F.isEmpty(mappings)) {
+            updateRes.dhtNodes(Collections.<UUID>emptyList());
+
             completionCb.apply(updateReq, updateRes);
 
             onDone();
@@ -406,42 +366,54 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
             return;
         }
 
-        boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
+        boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
+            !ret.emptyResult() ||
+            updateRes.nearVersion() != null;
 
-        if (updateReq.dhtReplyToNear()) {
-            assert fullSync;
+        boolean needMapping = updateReq.fullSync() && (!updateReq.mappingKnown() || !allUpdated());
 
-            boolean needReplyToNear = ret.hasValue() || updateRes.nearVersion() != null;
+        if (needMapping) {
+            initMapping(updateRes);
 
-            sendDhtRequests(true, nearNode, ret);
-
-            if (needReplyToNear)
-                completionCb.apply(updateReq, updateRes);
+            needReplyToNear = true;
         }
-        else {
-            sendDhtRequests(false, nearNode, ret);
 
-            if (!fullSync)
-                completionCb.apply(updateReq, updateRes);
+        sendDhtRequests(nearNode, ret);
+
+        if (needReplyToNear)
+            completionCb.apply(updateReq, updateRes);
+    }
+
+    private void initMapping(GridNearAtomicUpdateResponse updateRes) {
+        List<UUID> dhtNodes;
+
+        if (!F.isEmpty(mappings)) {
+            dhtNodes = new ArrayList<>(mappings.size());
+
+            dhtNodes.addAll(mappings.keySet());
         }
+        else
+            dhtNodes = Collections.emptyList();
+
+        updateRes.dhtNodes(dhtNodes);
     }
 
     /**
-     * @param nearReplyInfo {@code True} if need add information for near node response.
+     * @param nearNode Near node.
      * @param ret Return value.
      */
-    private void sendDhtRequests(boolean nearReplyInfo, ClusterNode nearNode, GridCacheReturn ret) {
+    private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret) {
         for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
             try {
-                if (nearReplyInfo) {
+                assert !cctx.localNodeId().equals(req.nodeId()) : req;
+
+                if (updateReq.fullSync()) {
                     req.nearReplyInfo(nearNode.id(), updateReq.futureId());
 
-                    if (!ret.hasValue())
-                        req.setResult(ret.success());
+                    if (ret.emptyResult())
+                        req.hasResult(true);
                 }
 
-                assert !cctx.localNodeId().equals(req.nodeId()) : req;
-
                 cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                 if (msgLog.isDebugEnabled()) {
@@ -467,27 +439,36 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /**
-     * @param nodeId Node ID.
-     * @param res Response.
-     */
-    public final void onDhtResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
-        assert !updateReq.dhtReplyToNear();
-
-        registerResponse(nodeId);
-    }
-
-    /**
      * Deferred update response.
      *
      * @param nodeId Backup node ID.
      */
-    public final void onDeferredResponse(UUID nodeId) {
+    final void onDeferredResponse(UUID nodeId) {
         if (log.isDebugEnabled())
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
         registerResponse(nodeId);
     }
 
+    final void onDhtResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
+        if (!F.isEmpty(res.nearEvicted())) {
+            for (KeyCacheObject key : res.nearEvicted()) {
+                try {
+                    GridDhtCacheEntry entry = (GridDhtCacheEntry)cctx.cache().peekEx(key);
+
+                    if (entry != null)
+                        entry.removeReader(nodeId, res.messageId());
+                }
+                catch (GridCacheEntryRemovedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Entry with evicted reader was removed [key=" + key + ", err=" + e + ']');
+                }
+            }
+        }
+
+        registerResponse(nodeId);
+    }
+
     /**
      * @param updateRes Response.
      * @param err Error.
@@ -528,13 +509,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     clsr.apply(suc);
             }
 
-            if (updateReq.writeSynchronizationMode() == FULL_SYNC && !updateReq.dhtReplyToNear()) {
-                if (!suc)
-                    addFailedKeys(updateRes, err);
-
-                completionCb.apply(updateReq, updateRes);
-            }
-
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index e86c3a5..34ac2ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -55,9 +55,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /** */
     static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08;
 
-    /** */
-    static final int DHT_ATOMIC_RESULT_SUCCESS_MASK = 0x10;
-
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
@@ -145,12 +142,10 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     }
 
     /**
-     * @param success Success flag.
+     * @param res Result flag.
      */
-    public void setResult(boolean success) {
-        setFlag(true, DHT_ATOMIC_HAS_RESULT_MASK);
-
-        setFlag(success, DHT_ATOMIC_RESULT_SUCCESS_MASK);
+    public void hasResult(boolean res) {
+        setFlag(res, DHT_ATOMIC_HAS_RESULT_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ca78186..fc0e16c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException;
 import org.apache.ignite.internal.processors.cache.EntryGetResult;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -107,8 +108,6 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -378,6 +377,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(),
+            GridNearAtomicCheckUpdateRequest.class,
+            new CI2<UUID, GridNearAtomicCheckUpdateRequest>() {
+                @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) {
+                    processCheckUpdateRequest(uuid, msg);
+                }
+
+                @Override public String toString() {
+                    return "GridNearAtomicCheckUpdateRequest handler " +
+                        "[msgIdx=" + GridNearAtomicCheckUpdateRequest.CACHE_MSG_IDX + ']';
+                }
+            });
+
         if (near == null) {
             ctx.io().addHandler(
                 ctx.cacheId(),
@@ -1984,9 +1996,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         for (int i = 0; i < locked.size(); i++) {
             GridDhtCacheEntry entry = locked.get(i);
 
-            if (entry == null)
-                continue;
-
             try {
                 if (!checkFilter(entry, req, res)) {
                     if (expiry != null && entry.hasValue()) {
@@ -2784,7 +2793,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         catch (IgniteCheckedException e) {
-            res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e, ctx);
+            res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);
         }
 
         if (storeErr != null) {
@@ -2793,7 +2802,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             for (Object failedKey : storeErr.failedKeys())
                 failed.add(ctx.toCacheKeyObject(failedKey));
 
-            res.addFailedKeys(failed, storeErr.getCause(), ctx);
+            res.addFailedKeys(failed, storeErr.getCause());
         }
 
         return dhtFut;
@@ -3065,6 +3074,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param checkReq Request.
+     */
+    private void processCheckUpdateRequest(UUID nodeId, GridNearAtomicCheckUpdateRequest checkReq) {
+        /**
+         * Message is processed in the same stripe, so primary already processed update request. It is possible
+         * response was not sent if operation result was empty. Near node will get original response or this one.
+         */
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+            nodeId,
+            checkReq.futureId(),
+            false);
+
+        GridCacheReturn ret = new GridCacheReturn(false, true);
+
+        res.returnValue(ret);
+
+        try {
+            ctx.io().send(nodeId, res, ctx.ioPolicy());
+        }
+        catch (IgniteCheckedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
      * @param nodeId Sender node ID.
      * @param req Dht atomic update request.
      */
@@ -3078,7 +3113,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         GridCacheVersion ver = req.writeVersion();
 
-        GridDhtAtomicUpdateResponse dhtRes = null;
         GridDhtAtomicNearResponse nearRes = null;
 
         if (req.nearNodeId() != null) {
@@ -3088,12 +3122,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 nodeId,
                 req.flags());
         }
-        else if (req.writeSynchronizationMode() == FULL_SYNC) {
-            dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
-                req.partition(),
-                req.futureId(),
-                ctx.deploymentEnabled());
-        }
 
         boolean replicate = ctx.isDrEnabled();
 
@@ -3182,40 +3210,34 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 if (nearRes != null)
                     nearRes.addFailedKey(key, err);
-                else if (dhtRes != null)
-                    dhtRes.addFailedKey(key, err);
 
                 U.error(log, "Failed to update key on backup node: " + key, e);
             }
         }
 
+        GridDhtAtomicUpdateResponse dhtRes = null;
+
         if (isNearEnabled(cacheCfg)) {
             List<KeyCacheObject> nearEvicted =
                 ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
 
             if (nearEvicted != null) {
-                if (dhtRes == null) {
-                    dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
-                        req.partition(),
-                        req.futureId(),
-                        ctx.deploymentEnabled());
-                }
+                dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+                    req.partition(),
+                    req.futureId(),
+                    ctx.deploymentEnabled());
 
                 dhtRes.nearEvicted(nearEvicted);
             }
         }
 
-        if (nearRes != null) {
+        if (nearRes != null)
             sendDhtNearResponse(req, nearRes);
 
+        if (dhtRes != null)
+            sendDhtPrimaryResponse(nodeId, req, dhtRes);
+        else
             sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
-        }
-        else {
-            if (dhtRes != null)
-                sendDhtPrimaryResponse(nodeId, req, dhtRes);
-            else
-                sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
-        }
     }
 
     /**
@@ -3459,24 +3481,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
-        if (!F.isEmpty(res.nearEvicted())) {
-            for (KeyCacheObject key : res.nearEvicted()) {
-                try {
-                    GridDhtCacheEntry entry = (GridDhtCacheEntry)ctx.cache().peekEx(key);
-
-                    if (entry != null)
-                        entry.removeReader(nodeId, res.messageId());
-                }
-                catch (GridCacheEntryRemovedException e) {
-                    if (log.isDebugEnabled())
-                        log.debug("Entry with evicted reader was removed [key=" + key + ", err=" + e + ']');
-                }
-            }
-        }
-
-
         GridDhtAtomicAbstractUpdateFuture updateFut =
-                (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
+            (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
         if (updateFut != null) {
             if (msgLog.isDebugEnabled()) {
@@ -3488,7 +3494,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
         else {
             U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() +
-                    ", node=" + nodeId + ", res=" + res + ']');
+                ", node=" + nodeId + ", res=" + res + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index b2cc6e3..f32fed9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -31,7 +31,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_RESULT_SUCCESS_MASK;
 
 /**
  * TODO IGNITE-4705: no not send mapping if it == affinity?
@@ -131,7 +130,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     public GridCacheReturn result() {
         assert hasResult() : this;
 
-        return new GridCacheReturn(true, isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK));
+        return new GridCacheReturn(true, true);
     }
 
     /**
@@ -312,7 +311,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     @Override public String toString() {
         return S.toString(GridDhtAtomicNearResponse.class, this,
             "flags",
-            "res=" + isFlag(DHT_ATOMIC_HAS_RESULT_MASK) +
-            "|resOk=" + isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK));
+            "res=" + isFlag(DHT_ATOMIC_HAS_RESULT_MASK));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index e393322..0566ce4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -39,6 +39,9 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private boolean allUpdated;
+
     /**
      * @param cctx Cache context.
      * @param writeVer Write version.
@@ -55,9 +58,17 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     }
 
     /** {@inheritDoc} */
+    @Override protected boolean allUpdated() {
+        return allUpdated;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
-        if (mappings == null)
+        if (mappings == null) {
+            allUpdated = true;
+
             mappings = U.newHashMap(dhtNodes.size());
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ed57cf0..49e168a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -38,6 +38,9 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private int updateCntr;
+
     /**
      * @param cctx Cache context.
      * @param writeVer Write version.
@@ -56,8 +59,15 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     }
 
     /** {@inheritDoc} */
+    @Override protected boolean allUpdated() {
+        return updateCntr == updateReq.size();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
-        // No-op.
+        assert updateCntr < updateReq.size();
+
+        updateCntr++;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 3a9055e..6811236 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -73,7 +73,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean stableTop,
+        boolean mappingKnown,
         boolean skipStore,
         boolean keepBinary,
         boolean addDepInfo
@@ -88,7 +88,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
             retval,
             subjId,
             taskNameHash,
-            stableTop,
+            mappingKnown,
             skipStore,
             keepBinary,
             addDepInfo);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index e174bd7..5369d53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -29,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
@@ -37,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -46,6 +50,8 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  * Base for near atomic update futures.
@@ -199,6 +205,10 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         this.remapCnt = remapCnt;
     }
 
+    void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
+
+    }
+
     /**
      * Performs future mapping.
      */
@@ -303,6 +313,48 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      */
     public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res);
 
+    final void onPrimaryError(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
+        assert res.error() != null;
+
+        if (err == null)
+            err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+        Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
+
+        for (KeyCacheObject key : res.failedKeys())
+            keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+        err.add(keys, res.error(), req.topologyVersion());
+    }
+
+    /**
+     * @param req Request.
+     * @return Response to notify about primary failure.
+     */
+    final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
+        assert req.response() == null : req;
+        assert req.nodeId() != null : req;
+
+        if (msgLog.isDebugEnabled()) {
+            msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
+                ", node=" + req.nodeId() + ']');
+        }
+
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+            req.nodeId(),
+            req.futureId(),
+            cctx.deploymentEnabled());
+
+        ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+            "before response is received: " + req.nodeId());
+
+        e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+        res.addFailedKeys(req.keys(), e);
+
+        return res;
+    }
+
     /**
      * @param req Request.
      * @param e Error.
@@ -330,6 +382,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         private Set<UUID> dhtNodes;
 
         /** */
+        private Set<UUID> rcvd;
+
+        /** */
         private boolean hasRes;
 
         /**
@@ -340,7 +395,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
             this.req = req;
 
-            if (req.dhtReplyToNear()) {
+            if (req.initMappingLocally()) {
                 if (single) {
                     if (nodes.size() > 1) {
                         dhtNodes = U.newHashSet(nodes.size() - 1);
@@ -360,14 +415,20 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             }
         }
 
+        UUID primaryId() {
+            return req.nodeId();
+        }
+
         void addMapping(List<ClusterNode> nodes) {
-            assert req.dhtReplyToNear();
+            assert req.initMappingLocally();
 
             for (int i = 1; i < nodes.size(); i++)
                 dhtNodes.add(nodes.get(i).id());
         }
 
         boolean checkDhtNodes(GridCacheContext cctx) {
+            assert req.initMappingLocally() : req;
+
             if (finished())
                 return false;
 
@@ -394,7 +455,10 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if all expected responses are received.
          */
         private boolean finished() {
-            return req.dhtReplyToNear() ? (dhtNodes.isEmpty() && hasRes) : hasRes;
+            if (req.writeSynchronizationMode() == PRIMARY_SYNC)
+                return hasRes;
+
+            return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
         }
 
         /**
@@ -415,11 +479,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @param nodeId Node ID.
          * @return {@code True} if request processing finished.
          */
-        boolean onNodeLeft(UUID nodeId) {
-            if (!req.dhtReplyToNear() || finished())
-                return false;
+        DhtLeftResult onDhtNodeLeft(UUID nodeId) {
+            if (req.writeSynchronizationMode() != FULL_SYNC || dhtNodes == null || finished())
+                return DhtLeftResult.NOT_DONE;
+
+            if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
+                if (hasRes)
+                    return DhtLeftResult.DONE;
+                else
+                    return req.mappingKnown() ? DhtLeftResult.ALL_RCVD_CHECK_UPDATE : DhtLeftResult.NOT_DONE;
+            }
 
-            return dhtNodes.remove(nodeId) && finished();
+            return DhtLeftResult.NOT_DONE;
         }
 
         /**
@@ -430,7 +501,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if request processing finished.
          */
         boolean onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
-            assert req.dhtReplyToNear();
+            assert req.writeSynchronizationMode() == FULL_SYNC : req;
 
             if (finished())
                 return false;
@@ -438,14 +509,23 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             if (res.hasResult())
                 hasRes = true;
 
-            return dhtNodes != null && dhtNodes.remove(nodeId) && finished();
+            if (dhtNodes == null) {
+                if (rcvd == null)
+                    rcvd = new HashSet<>();
+
+                rcvd.add(nodeId);
+
+                return false;
+            }
+
+            return dhtNodes.remove(nodeId) && finished();
         }
 
         /**
          * @param res Response.
          * @return {@code True} if request processing finished.
          */
-        boolean onPrimaryResponse(GridNearAtomicUpdateResponse res) {
+        boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
             assert !finished() : this;
 
             hasRes = true;
@@ -454,14 +534,41 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
             assert onRes;
 
-            if (res.error() != null || res.remapTopologyVersion() != null)
+            if (res.error() != null || res.remapTopologyVersion() != null) {
+                dhtNodes = Collections.emptySet(); // Mark as finished.
+
                 return true;
+            }
 
             assert res.returnValue() != null : res;
 
+            if (res.dhtNodes() != null)
+                initDhtNodes(res.dhtNodes(), cctx);
+
             return finished();
         }
 
+        private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
+            assert dhtNodes == null || req.initMappingLocally();
+
+            dhtNodes = null;
+
+            for (UUID dhtNodeId : nodeIds) {
+                if ((rcvd != null && rcvd.contains(dhtNodeId)))
+                    continue;
+
+                if (cctx.discovery().node(dhtNodeId) != null) {
+                    if (dhtNodes == null)
+                        dhtNodes = U.newHashSet(nodeIds.size());
+
+                    dhtNodes.add(dhtNodeId);
+                }
+            }
+
+            if (dhtNodes == null)
+                dhtNodes = Collections.emptySet();
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(PrimaryRequestState.class, this,
@@ -470,6 +577,20 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         }
     }
 
+    /**
+     *
+     */
+    enum DhtLeftResult {
+        /** */
+        DONE,
+
+        /** */
+        NOT_DONE,
+
+        /** */
+        ALL_RCVD_CHECK_UPDATE
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearAtomicAbstractUpdateFuture.class, this, super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 64839d9..a833588 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -46,8 +46,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
-    /** Stable topology flag mask. */
-    private static final int STABLE_TOP_FLAG_MASK = 0x01;
+    /** . */
+    private static final int MAPPING_KNOWN_FLAG_MASK = 0x01;
 
     /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
     private static final int TOP_LOCKED_FLAG_MASK = 0x02;
@@ -125,7 +125,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean stableTop,
+        boolean mappingKnown,
         boolean skipStore,
         boolean keepBinary,
         boolean addDepInfo
@@ -140,8 +140,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         this.taskNameHash = taskNameHash;
         this.addDepInfo = addDepInfo;
 
-        if (stableTop)
-            stableTopology(true);
+        if (mappingKnown)
+            mappingKnown(true);
 
         if (topLocked)
             topologyLocked(true);
@@ -176,16 +176,22 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         return ctx.atomicMessageLogger();
     }
 
-    boolean stableTopology() {
-        return isFlag(STABLE_TOP_FLAG_MASK);
+    boolean initMappingLocally() {
+        return mappingKnown() && fullSync();
     }
 
-    boolean dhtReplyToNear() {
-        return stableTopology() && syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+    boolean mappingKnown() {
+        return isFlag(MAPPING_KNOWN_FLAG_MASK);
     }
 
-    void stableTopology(boolean stableTop) {
-        setFlag(stableTop, STABLE_TOP_FLAG_MASK);
+    boolean fullSync() {
+        assert syncMode != null;
+
+        return syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+    }
+
+    void mappingKnown(boolean stableTop) {
+        setFlag(stableTop, MAPPING_KNOWN_FLAG_MASK);
     }
 
     public int taskNameHash() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
new file mode 100644
index 0000000..030abdf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    @GridDirectTransient
+    private GridNearAtomicAbstractUpdateRequest updateReq;
+
+    /** */
+    private int partId;
+
+    /** */
+    private long futId;
+
+    /**
+     *
+     */
+    public GridNearAtomicCheckUpdateRequest() {
+        // No-op.
+    }
+
+    GridNearAtomicCheckUpdateRequest(int cacheId, GridNearAtomicAbstractUpdateRequest updateReq, int partId, long futId) {
+        assert updateReq != null;
+        assert partId >= 0 : partId;
+
+        this.cacheId = cacheId;
+        this.updateReq = updateReq;
+        this.partId = partId;
+        this.futId = futId;
+    }
+
+    long futureId() {
+        return futId;
+    }
+
+    GridNearAtomicAbstractUpdateRequest updateRequest() {
+        return updateReq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -47;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicCheckUpdateRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicCheckUpdateRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 81b89e0..b2eb2bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -152,7 +152,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean stableTop,
+        boolean mappingKnown,
         boolean skipStore,
         boolean keepBinary,
         boolean addDepInfo,
@@ -168,7 +168,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
             retval,
             subjId,
             taskNameHash,
-            stableTop,
+            mappingKnown,
             skipStore,
             keepBinary,
             addDepInfo);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index b078b8d..5934d41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -83,7 +83,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean stableTop,
+        boolean mappingKnown,
         boolean skipStore,
         boolean keepBinary,
         boolean addDepInfo
@@ -99,7 +99,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
             retval,
             subjId,
             taskNameHash,
-            stableTop,
+            mappingKnown,
             skipStore,
             keepBinary,
             addDepInfo

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index b4f88aa..82d397d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -60,7 +60,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private Object key;
 
     /** Values. */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Object val;
 
     /** */
@@ -135,50 +134,55 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        GridNearAtomicAbstractUpdateRequest req;
-        GridNearAtomicUpdateResponse res = null;
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
         AffinityTopologyVersion remapTopVer0 = null;
 
+        GridNearAtomicCheckUpdateRequest checkReq = null;
+
         synchronized (mux) {
             if (reqState == null)
                 return false;
 
-            req = reqState.processPrimaryResponse(nodeId);
+            boolean rcvAll = false;
 
-            if (req != null) {
-                res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                    nodeId,
-                    req.futureId(),
-                    cctx.deploymentEnabled());
+            if (reqState.req.nodeId.equals(nodeId)) {
+                GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId);
+
+                if (req != null) {
+                    GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
 
-                ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
-                    "before response is received: " + nodeId);
+                    rcvAll = true;
 
-                e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+                    reqState.onPrimaryResponse(res, cctx);
 
-                res.addFailedKeys(req.keys(), e);
+                    onPrimaryError(req, res);
+                }
             }
             else {
-                if (reqState.onNodeLeft(nodeId)) {
-                    opRes0 = opRes;
-                    err0 = err;
-                    remapTopVer0 = onAllReceived();
+                DhtLeftResult res = reqState.onDhtNodeLeft(nodeId);
+
+                if (res == DhtLeftResult.DONE)
+                    rcvAll = true;
+                else if (res == DhtLeftResult.ALL_RCVD_CHECK_UPDATE) {
+                    checkReq = new GridNearAtomicCheckUpdateRequest(cctx.cacheId(),
+                        reqState.req,
+                        reqState.req.partition(),
+                        futId);
                 }
                 else
                     return false;
             }
-        }
 
-        if (res != null) {
-            if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Near update single fut, node left [futId=" + req.futureId() +
-                    ", node=" + nodeId + ']');
+            if (rcvAll) {
+                opRes0 = opRes;
+                err0 = err;
+                remapTopVer0 = onAllReceived();
             }
-
-            onPrimaryResponse(nodeId, res, true);
         }
+
+        if (checkReq != null)
+            sendCheckUpdateRequest(checkReq);
         else
             finishUpdateFuture(opRes0, err0, remapTopVer0);
 
@@ -281,22 +285,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 remapTopVer = res.remapTopologyVersion();
             }
-            else if (res.error() != null) {
-                if (res.failedKeys() != null) {
-                    assert res.failedKeys().size() == 1 : res.failedKeys();
-
-                    if (err == null)
-                        err = new CachePartialUpdateCheckedException(
-                            "Failed to update keys (retry update if possible).");
-
-                    Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
-
-                    for (KeyCacheObject key : res.failedKeys())
-                        keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
-
-                    err.add(keys, res.error(), req.topologyVersion());
-                }
-            }
+            else if (res.error() != null)
+                onPrimaryError(req, res);
             else {
                 GridCacheReturn ret = res.returnValue();
 
@@ -317,7 +307,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 assert reqState != null;
 
-                if (!reqState.onPrimaryResponse(res))
+                if (!reqState.onPrimaryResponse(res, cctx))
                     return;
             }
 
@@ -515,7 +505,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 return;
             }
 
-            if (reqState.req.dhtReplyToNear() && (cctx.discovery().topologyVersion() != topVer.topologyVersion())) {
+            if (reqState.req.initMappingLocally() && (cctx.discovery().topologyVersion() != topVer.topologyVersion())) {
                 if (!checkDhtNodes(futId))
                     return;
             }
@@ -601,7 +591,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         else
             val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
 
-        boolean stableTop = cctx.topology().rebalanceFinished(topVer) &&
+        boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) &&
             !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
 
         List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
@@ -628,7 +618,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     invokeArgs,
                     subjId,
                     taskNameHash,
-                    stableTop,
+                    mappingKnown,
                     skipStore,
                     keepBinary,
                     cctx.deploymentEnabled());
@@ -646,7 +636,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         retval,
                         subjId,
                         taskNameHash,
-                        stableTop,
+                        mappingKnown,
                         skipStore,
                         keepBinary,
                         cctx.deploymentEnabled());
@@ -664,7 +654,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                         filter,
                         subjId,
                         taskNameHash,
-                        stableTop,
+                        mappingKnown,
                         skipStore,
                         keepBinary,
                         cctx.deploymentEnabled());
@@ -686,7 +676,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 filter,
                 subjId,
                 taskNameHash,
-                stableTop,
+                mappingKnown,
                 skipStore,
                 keepBinary,
                 cctx.deploymentEnabled(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index c7c92dd..009b7c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -101,7 +101,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
         @Nullable Object[] invokeArgs,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean stableTop,
+        boolean mappingKnown,
         boolean skipStore,
         boolean keepBinary,
         boolean addDepInfo
@@ -117,14 +117,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
             retval,
             subjId,
             taskNameHash,
-            stableTop,
+            mappingKnown,
             skipStore,
             keepBinary,
             addDepInfo
         );
-        this.invokeArgs = invokeArgs;
 
         assert op == TRANSFORM : op;
+
+        this.invokeArgs = invokeArgs;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index eff5e31..2fd4f8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -92,7 +92,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean stableTop,
+        boolean mappingKnown,
         boolean skipStore,
         boolean keepBinary,
         boolean addDepInfo
@@ -107,7 +107,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
             retval,
             subjId,
             taskNameHash,
-            stableTop,
+            mappingKnown,
             skipStore,
             keepBinary,
             addDepInfo);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 16715a2..7b1c530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -160,7 +160,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override public boolean onNodeLeft(UUID nodeId) {
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
-        GridNearAtomicUpdateResponse res = null;
         AffinityTopologyVersion remapTopVer0 = null;
 
         boolean rcvAll = false;
@@ -170,35 +169,56 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 return false;
 
             if (singleReq != null) {
-                GridNearAtomicAbstractUpdateRequest req = singleReq.processPrimaryResponse(nodeId);
+                if (singleReq.req.nodeId.equals(nodeId)) {
+                    GridNearAtomicAbstractUpdateRequest req = singleReq.processPrimaryResponse(nodeId);
 
-                if (req == null) {
-                    if (singleReq.onNodeLeft(nodeId)) {
-                        rcvAll = true;
+                    if (req != null) {
+                        GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
 
-                        opRes0 = opRes;
-                        err0 = err;
-                        remapTopVer0 = onAllReceived();
+                        singleReq.onPrimaryResponse(res, cctx);
+
+                        onPrimaryError(req, res);
                     }
                 }
-                else
-                    res = primaryFailedResponse(req, nodeId);
+                else {
+                    singleReq.onDhtNodeLeft(nodeId);
+                }
+
+                if (rcvAll) {
+                    opRes0 = opRes;
+                    err0 = err;
+                    remapTopVer0 = onAllReceived();
+                }
             }
             else {
                 if (mappings == null)
                     return false;
 
-                PrimaryRequestState reqState = mappings.get(nodeId);
+                for (Map.Entry<UUID, PrimaryRequestState> e : mappings.entrySet()) {
+                    assert e.getKey().equals(e.getValue().req.nodeId());
 
-                if (reqState != null) {
-                    GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId);
+                    PrimaryRequestState reqState = e.getValue();
 
-                    if (req != null)
-                        res = primaryFailedResponse(req, nodeId);
-                }
+                    boolean reqDone = false;
+
+                    if (e.getKey().equals(nodeId)) {
+                        GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId);
+
+                        if (req != null) {
+                            reqDone = true;
+
+                            GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
 
-                for (PrimaryRequestState reqState0 : mappings.values()) {
-                    if (reqState0.onNodeLeft(nodeId)) {
+                            reqState.onPrimaryResponse(res, cctx);
+
+                            onPrimaryError(req, res);
+                        }
+                    }
+                    else {
+                        reqState.onDhtNodeLeft(nodeId);
+                    }
+
+                    if (reqDone) {
                         assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
 
                         resCnt++;
@@ -219,40 +239,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (rcvAll)
             finishUpdateFuture(opRes0, err0, remapTopVer0);
-        else if (res != null)
-            onPrimaryResponse(nodeId, res, true);
 
         return false;
     }
 
-    /**
-     * @param req Request.
-     * @param nodeId Failed node ID.
-     * @return Response to notify about primary failure.
-     */
-    private GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req, UUID nodeId) {
-        assert req.response() == null : req;
-
-        if (msgLog.isDebugEnabled()) {
-            msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
-                ", node=" + nodeId + ']');
-        }
-
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-            nodeId,
-            req.futureId(),
-            cctx.deploymentEnabled());
-
-        ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
-            "before response is received: " + nodeId);
-
-        e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
-
-        res.addFailedKeys(req.keys(), e);
-
-        return res;
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
         return null;
@@ -373,7 +363,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 if (req == null)
                     return;
 
-                rcvAll = singleReq.onPrimaryResponse(res);
+                rcvAll = singleReq.onPrimaryResponse(res, cctx);
             }
             else {
                 if (mappings == null)
@@ -387,7 +377,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 req = reqState.processPrimaryResponse(nodeId);
 
                 if (req != null) {
-                    if (reqState.onPrimaryResponse(res)) {
+                    if (reqState.onPrimaryResponse(res, cctx)) {
                         assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
 
                         resCnt++;
@@ -404,7 +394,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     return;
             }
 
-            assert req != null && req.topologyVersion().equals(topVer) : req;
+            assert req.topologyVersion().equals(topVer) : req;
 
             if (res.remapTopologyVersion() != null) {
                 assert !req.topologyVersion().equals(res.remapTopologyVersion());
@@ -417,20 +407,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 if (remapTopVer == null || remapTopVer.compareTo(res.remapTopologyVersion()) < 0)
                     remapTopVer = req.topologyVersion();
             }
-            else if (res.error() != null) {
-                if (res.failedKeys() != null) {
-                    if (err == null)
-                        err = new CachePartialUpdateCheckedException(
-                            "Failed to update keys (retry update if possible).");
-
-                    Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
-
-                    for (KeyCacheObject key : res.failedKeys())
-                        keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
-
-                    err.add(keys, res.error(), req.topologyVersion());
-                }
-            }
+            else if (res.error() != null)
+                onPrimaryError(req, res);
             else {
                 GridCacheReturn ret = res.returnValue();
 
@@ -768,20 +746,20 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         int size = keys.size();
 
         try {
-            boolean stableTop = cctx.topology().rebalanceFinished(topVer) &&
+            boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) &&
                 !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
 
             if (size == 1) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
-                singleReq0 = mapSingleUpdate(topVer, futId, stableTop);
+                singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown);
             }
             else {
                 Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
                     topVer,
                     futId,
                     remapKeys,
-                    stableTop);
+                    mappingKnown);
 
                 if (pendingMappings.size() == 1)
                     singleReq0 = F.firstValue(pendingMappings);
@@ -813,7 +791,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 return;
             }
 
-            if (stableTop && syncMode == FULL_SYNC && cctx.discovery().topologyVersion() != topVer.topologyVersion()) {
+            if (mappingKnown && syncMode == FULL_SYNC && cctx.discovery().topologyVersion() != topVer.topologyVersion()) {
                 if (!checkDhtNodes(futId))
                     return;
             }
@@ -922,7 +900,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         AffinityTopologyVersion topVer,
         Long futId,
         @Nullable Collection<KeyCacheObject> remapKeys,
-        boolean stableTop) throws Exception {
+        boolean mappingKnown) throws Exception {
         Iterator<?> it = null;
 
         if (vals != null)
@@ -1020,7 +998,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     filter,
                     subjId,
                     taskNameHash,
-                    stableTop,
+                    mappingKnown,
                     skipStore,
                     keepBinary,
                     cctx.deploymentEnabled(),
@@ -1031,7 +1009,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 pendingMappings.put(nodeId, mapped);
             }
 
-            if (mapped.req.dhtReplyToNear())
+            if (mapped.req.initMappingLocally())
                 mapped.addMapping(nodes);
 
             mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
@@ -1046,7 +1024,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Request.
      * @throws Exception If failed.
      */
-    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean stableTop) throws Exception {
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown) throws Exception {
         Object key = F.first(keys);
 
         Object val;
@@ -1121,7 +1099,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             filter,
             subjId,
             taskNameHash,
-            stableTop,
+            mappingKnown,
             skipStore,
             keepBinary,
             cctx.deploymentEnabled(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index c2600a3..f9e6de8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -63,22 +63,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Future ID. */
     private long futId;
 
-    /** Update error. */
-    @GridDirectTransient
-    private volatile IgniteCheckedException err;
-
-    /** Serialized error. */
-    private byte[] errBytes;
+    /** */
+    private UpdateErrors errs;
 
     /** Return value. */
     @GridToStringInclude
     private GridCacheReturn ret;
 
-    /** Failed keys. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private volatile Collection<KeyCacheObject> failedKeys;
-
     /** */
     private AffinityTopologyVersion remapTopVer;
 
@@ -107,6 +98,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Partition ID. */
     private int partId = -1;
 
+    /** */
+    @GridDirectCollection(UUID.class)
+    @GridToStringInclude
+    private List<UUID> dhtNodes;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -132,6 +128,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
         return CACHE_MSG_IDX;
     }
 
+    public void dhtNodes(List<UUID> dhtNodes) {
+        this.dhtNodes = dhtNodes;
+    }
+
+    public List<UUID> dhtNodes() {
+        return dhtNodes;
+    }
+
     /**
      * @return Node ID this response should be sent to.
      */
@@ -166,19 +170,22 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param err Error.
      */
     public void error(IgniteCheckedException err){
-        this.err = err;
+        if (errs == null)
+            errs = new UpdateErrors();
+
+        errs.onError(err);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteCheckedException error() {
-        return err;
+        return errs != null ? errs.error() : null;
     }
 
     /**
      * @return Collection of failed keys.
      */
     public Collection<KeyCacheObject> failedKeys() {
-        return failedKeys;
+        return errs != null ? errs.failedKeys() : null;
     }
 
     /**
@@ -348,15 +355,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param e Error cause.
      */
     public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
-        if (failedKeys == null)
-            failedKeys = new ConcurrentLinkedQueue<>();
-
-        failedKeys.add(key);
+        if (errs == null)
+            errs = new UpdateErrors();
 
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
+        errs.addFailedKey(key, e);
     }
 
     /**
@@ -365,37 +367,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param keys Key to add.
      * @param e Error cause.
      */
-    public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
-        if (keys != null) {
-            if (failedKeys == null)
-                failedKeys = new ArrayList<>(keys.size());
+    synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+        if (errs == null)
+            errs = new UpdateErrors();
 
-            failedKeys.addAll(keys);
-        }
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
-    }
-
-    /**
-     * Adds keys to collection of failed keys.
-     *
-     * @param keys Key to add.
-     * @param e Error cause.
-     * @param ctx Context.
-     */
-    public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) {
-        if (failedKeys == null)
-            failedKeys = new ArrayList<>(keys.size());
-
-        failedKeys.addAll(keys);
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
+        errs.addFailedKeys(keys, e);
     }
 
     /** {@inheritDoc}
@@ -403,12 +379,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (err != null && errBytes == null)
-            errBytes = U.marshal(ctx, err);
-
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        prepareMarshalCacheObjects(failedKeys, cctx);
+        if (errs != null)
+            errs.prepareMarshal(this, cctx);
 
         prepareMarshalCacheObjects(nearVals, cctx);
 
@@ -420,12 +394,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errBytes != null && err == null)
-            err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+        if (errs != null)
+            errs.finishUnmarshal(this, cctx, ldr);
 
         finishUnmarshalCacheObjects(nearVals, cctx, ldr);
 
@@ -464,13 +436,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeByteArray("errBytes", errBytes))
+                if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("errs", errs))
                     return false;
 
                 writer.incrementState();
@@ -552,7 +524,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
         switch (reader.state()) {
             case 3:
-                errBytes = reader.readByteArray("errBytes");
+                dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
                     return false;
@@ -560,7 +532,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 4:
-                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+                errs = reader.readMessage("errs");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ab159de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
index 76f656f..9f7945f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
@@ -88,7 +88,7 @@ public class UpdateErrors implements Message {
      * @param key Key to add.
      * @param e Error cause.
      */
-    public void addFailedKey(KeyCacheObject key, Throwable e) {
+    void addFailedKey(KeyCacheObject key, Throwable e) {
         if (failedKeys == null)
             failedKeys = new ArrayList<>();
 
@@ -100,6 +100,18 @@ public class UpdateErrors implements Message {
         err.addSuppressed(e);
     }
 
+    void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+        if (failedKeys == null)
+            failedKeys = new ArrayList<>(keys.size());
+
+        failedKeys.addAll(keys);
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
     /** {@inheritDoc} */
     void prepareMarshal(GridCacheMessage msg, GridCacheContext cctx) throws IgniteCheckedException {
         msg.prepareMarshalCacheObjects(failedKeys, cctx);


Mime
View raw message