ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [28/40] ignite git commit: ignite-4705 Atomic cache protocol change: notify client node from backups
Date Tue, 14 Mar 2017 15:00:22 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index f182ecb..a44ccf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -47,10 +46,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -58,18 +55,14 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
  * DHT atomic cache near update future.
  */
 public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
-    /** Fast map flag. */
-    private final boolean fastMap;
-
     /** Keys */
     private Collection<?> keys;
 
@@ -87,13 +80,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /** Mappings if operations is mapped to more than one node. */
     @GridToStringInclude
-    private Map<UUID, GridNearAtomicFullUpdateRequest> mappings;
+    private Map<UUID, PrimaryRequestState> mappings;
 
     /** Keys to remap. */
+    @GridToStringInclude
     private Collection<KeyCacheObject> remapKeys;
 
     /** Not null is operation is mapped to single node. */
-    private GridNearAtomicFullUpdateRequest singleReq;
+    @GridToStringInclude
+    private PrimaryRequestState singleReq;
+
+    /** */
+    private int resCnt;
 
     /**
      * @param cctx Cache context.
@@ -149,84 +147,124 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         this.vals = vals;
         this.conflictPutVals = conflictPutVals;
         this.conflictRmvVals = conflictRmvVals;
-
-        fastMap = cache.isFastMap(filter, op);
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
+    @Override public Long id() {
         synchronized (mux) {
-            return futVer;
+            return futId;
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        GridNearAtomicUpdateResponse res = null;
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
+        AffinityTopologyVersion remapTopVer0 = null;
+
+        boolean rcvAll = false;
 
-        GridNearAtomicFullUpdateRequest req;
+        List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
 
         synchronized (mux) {
-            if (singleReq != null)
-                req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
-            else
-                req = mappings != null ? mappings.get(nodeId) : null;
+            if (futId == null)
+                return false;
 
-            if (req != null && req.response() == null) {
-                res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                    nodeId,
-                    req.futureVersion(),
-                    cctx.deploymentEnabled());
+            if (singleReq != null) {
+                if (singleReq.req.nodeId.equals(nodeId)) {
+                    GridNearAtomicAbstractUpdateRequest req = singleReq.onPrimaryFail();
 
-                ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
-                    "before response is received: " + nodeId);
+                    if (req != null) {
+                        rcvAll = true;
 
-                e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+                        GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
 
-                res.addFailedKeys(req.keys(), e);
-            }
-        }
+                        singleReq.onPrimaryResponse(res, cctx);
+
+                        onPrimaryError(req, res);
+                    }
+                }
+                else {
+                    DhtLeftResult res = singleReq.onDhtNodeLeft(nodeId);
+
+                    if (res == DhtLeftResult.DONE)
+                        rcvAll = true;
+                    else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+                        checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req));
+                }
 
-        if (res != null) {
-            if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Near update fut, node left [futId=" + req.futureVersion() +
-                    ", writeVer=" + req.updateVersion() +
-                    ", node=" + nodeId + ']');
+                if (rcvAll) {
+                    opRes0 = opRes;
+                    err0 = err;
+                    remapTopVer0 = onAllReceived();
+                }
             }
+            else {
+                if (mappings == null)
+                    return false;
 
-            onResult(nodeId, res, true);
-        }
+                for (Map.Entry<UUID, PrimaryRequestState> e : mappings.entrySet()) {
+                    assert e.getKey().equals(e.getValue().req.nodeId());
 
-        return false;
-    }
+                    PrimaryRequestState reqState = e.getValue();
 
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        // Wait fast-map near atomic update futures in CLOCK mode.
-        if (fastMap) {
-            GridFutureAdapter<Void> fut;
+                    boolean reqDone = false;
 
-            synchronized (mux) {
-                if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) {
-                    if (topCompleteFut == null)
-                        topCompleteFut = new GridFutureAdapter<>();
+                    if (e.getKey().equals(nodeId)) {
+                        GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
 
-                    fut = topCompleteFut;
-                }
-                else
-                    fut = null;
-            }
+                        if (req != null) {
+                            reqDone = true;
+
+                            GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
+
+                            reqState.onPrimaryResponse(res, cctx);
+
+                            onPrimaryError(req, res);
+                        }
+                    }
+                    else {
+                        DhtLeftResult res = reqState.onDhtNodeLeft(nodeId);
+
+                        if (res == DhtLeftResult.DONE)
+                            reqDone = true;
+                        else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+                            if (checkReqs == null)
+                                checkReqs = new ArrayList<>();
 
-            if (fut != null && isDone()) {
-                fut.onDone();
+                            checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req));
+                        }
+                    }
+
+                    if (reqDone) {
+                        assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                        resCnt++;
+
+                        if (mappings.size() == resCnt) {
+                            rcvAll = true;
 
-                return null;
+                            opRes0 = opRes;
+                            err0 = err;
+                            remapTopVer0 = onAllReceived();
+
+                            break;
+                        }
+                    }
+                }
             }
+        }
 
-            return fut;
+        if (checkReqs != null) {
+            assert !rcvAll;
+
+            for (int i = 0; i < checkReqs.size(); i++)
+                sendCheckUpdateRequest(checkReqs.get(i));
         }
+        else if (rcvAll)
+            finishUpdateFuture(opRes0, err0, remapTopVer0);
 
-        return null;
+        return false;
     }
 
     /** {@inheritDoc} */
@@ -244,10 +282,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             retval = Collections.emptyMap();
 
         if (super.onDone(retval, err)) {
-            GridCacheVersion futVer = onFutureDone();
+            Long futId = onFutureDone();
 
-            if (futVer != null)
-                cctx.mvcc().removeAtomicFuture(futVer);
+            if (futId != null)
+                cctx.mvcc().removeAtomicFuture(futId);
 
             return true;
         }
@@ -256,145 +294,166 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
-        GridNearAtomicFullUpdateRequest req;
-
-        AffinityTopologyVersion remapTopVer = null;
-
-        GridCacheReturn opRes0 = null;
-        CachePartialUpdateCheckedException err0 = null;
-
-        boolean rcvAll;
-
-        GridFutureAdapter<?> fut0 = null;
+    @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+        GridCacheReturn opRes0;
+        CachePartialUpdateCheckedException err0;
+        AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (!res.futureVersion().equals(futVer))
+            if (futId == null || futId != res.futureId())
                 return;
 
-            if (singleReq != null) {
-                if (!singleReq.nodeId().equals(nodeId))
-                    return;
+            PrimaryRequestState reqState;
 
-                req = singleReq;
+            if (singleReq != null) {
+                assert singleReq.req.nodeId().equals(res.primaryId());
 
-                singleReq = null;
+                if (opRes == null && res.hasResult())
+                    opRes = res.result();
 
-                rcvAll = true;
+                if (singleReq.onDhtResponse(nodeId, res)) {
+                    opRes0 = opRes;
+                    err0 = err;
+                    remapTopVer0 = onAllReceived();
+                }
+                else
+                    return;
             }
             else {
-                req = mappings != null ? mappings.get(nodeId) : null;
+                reqState = mappings != null ? mappings.get(res.primaryId()) : null;
 
-                if (req != null && req.onResponse(res)) {
-                    resCnt++;
+                if (reqState != null) {
+                    if (opRes == null && res.hasResult())
+                        opRes = res.result();
 
-                    rcvAll = mappings.size() == resCnt;
+                    if (reqState.onDhtResponse(nodeId, res)) {
+                        assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                        resCnt++;
+
+                        if (mappings.size() == resCnt) {
+                            opRes0 = opRes;
+                            err0 = err;
+                            remapTopVer0 = onAllReceived();
+                        }
+                        else
+                            return;
+                    }
+                    else
+                        return;
                 }
                 else
                     return;
             }
+        }
 
-            assert req != null && req.topologyVersion().equals(topVer) : req;
+        UpdateErrors errors = res.errors();
 
-            if (res.remapKeys() != null) {
-                assert !fastMap || cctx.kernalContext().clientNode();
+        if (errors != null) {
+            assert errors.error() != null;
 
-                if (remapKeys == null)
-                    remapKeys = U.newHashSet(res.remapKeys().size());
+            onDone(errors.error());
+
+            return;
+        }
 
-                remapKeys.addAll(res.remapKeys());
+        finishUpdateFuture(opRes0, err0, remapTopVer0);
+    }
 
-                if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
-                    mapErrTopVer = req.topologyVersion();
-            }
-            else if (res.error() != null) {
-                if (res.failedKeys() != null) {
-                    if (err == null)
-                        err = new CachePartialUpdateCheckedException(
-                            "Failed to update keys (retry update if possible).");
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+    @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+        GridNearAtomicAbstractUpdateRequest req;
 
-                    Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
+        AffinityTopologyVersion remapTopVer0 = null;
 
-                    for (KeyCacheObject key : res.failedKeys())
-                        keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
 
-                    err.add(keys, res.error(), req.topologyVersion());
-                }
+        boolean rcvAll;
+
+        synchronized (mux) {
+            if (futId == null || futId != res.futureId())
+                return;
+
+            if (singleReq != null) {
+                req = singleReq.processPrimaryResponse(nodeId, res);
+
+                if (req == null)
+                    return;
+
+                rcvAll = singleReq.onPrimaryResponse(res, cctx);
             }
             else {
-                if (!req.fastMap() || req.hasPrimary()) {
-                    GridCacheReturn ret = res.returnValue();
-
-                    if (op == TRANSFORM) {
-                        if (ret != null) {
-                            assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-                            if (ret.value() != null) {
-                                if (opRes != null)
-                                    opRes.mergeEntryProcessResults(ret);
-                                else
-                                    opRes = ret;
-                            }
-                        }
-                    }
-                    else
-                        opRes = ret;
-                }
-            }
+                if (mappings == null)
+                    return;
 
-            if (rcvAll) {
-                if (remapKeys != null) {
-                    assert mapErrTopVer != null;
+                PrimaryRequestState reqState = mappings.get(nodeId);
 
-                    remapTopVer = cctx.shared().exchange().topologyVersion();
-                }
-                else {
-                    if (err != null &&
-                        X.hasCause(err, CachePartialUpdateCheckedException.class) &&
-                        X.hasCause(err, ClusterTopologyCheckedException.class) &&
-                        storeFuture() &&
-                        --remapCnt > 0) {
-                        ClusterTopologyCheckedException topErr =
-                            X.cause(err, ClusterTopologyCheckedException.class);
+                if (reqState == null)
+                    return;
 
-                        if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
-                            CachePartialUpdateCheckedException cause =
-                                X.cause(err, CachePartialUpdateCheckedException.class);
+                req = reqState.processPrimaryResponse(nodeId, res);
+
+                if (req != null) {
+                    if (reqState.onPrimaryResponse(res, cctx)) {
+                        assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                        resCnt++;
+
+                        rcvAll = mappings.size() == resCnt;
+                    }
+                    else {
+                        assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                        rcvAll = false;
+                    }
+                }
+                else
+                    return;
+            }
 
-                            assert cause != null && cause.topologyVersion() != null : err;
+            assert req.topologyVersion().equals(topVer) : req;
 
-                            remapTopVer =
-                                new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+            if (res.remapTopologyVersion() != null) {
+                assert !req.topologyVersion().equals(res.remapTopologyVersion());
 
-                            err = null;
+                if (remapKeys == null)
+                    remapKeys = U.newHashSet(req.size());
 
-                            Collection<Object> failedKeys = cause.failedKeys();
+                remapKeys.addAll(req.keys());
 
-                            remapKeys = new ArrayList<>(failedKeys.size());
+                if (remapTopVer == null || remapTopVer.compareTo(res.remapTopologyVersion()) < 0)
+                    remapTopVer = req.topologyVersion();
+            }
+            else if (res.error() != null)
+                onPrimaryError(req, res);
+            else {
+                GridCacheReturn ret = res.returnValue();
 
-                            for (Object key : failedKeys)
-                                remapKeys.add(cctx.toCacheKeyObject(key));
+                if (op == TRANSFORM) {
+                    if (ret != null) {
+                        assert ret.value() == null || ret.value() instanceof Map : ret.value();
 
-                            updVer = null;
+                        if (ret.value() != null) {
+                            if (opRes != null)
+                                opRes.mergeEntryProcessResults(ret);
+                            else
+                                opRes = ret;
                         }
                     }
                 }
+                else
+                    opRes = ret;
+            }
+
+            if (rcvAll) {
+                remapTopVer0 = onAllReceived();
 
-                if (remapTopVer == null) {
+                if (remapTopVer0 == null) {
                     err0 = err;
                     opRes0 = opRes;
                 }
-                else {
-                    fut0 = topCompleteFut;
-
-                    topCompleteFut = null;
-
-                    cctx.mvcc().removeAtomicFuture(futVer);
-
-                    futVer = null;
-                    topVer = AffinityTopologyVersion.ZERO;
-                }
             }
         }
 
@@ -406,67 +465,160 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
-                for (GridNearAtomicFullUpdateRequest req0 : mappings.values()) {
-                    GridNearAtomicUpdateResponse res0 = req0.response();
+                for (PrimaryRequestState reqState : mappings.values()) {
+                    GridNearAtomicUpdateResponse res0 = reqState.req.response();
 
-                    assert res0 != null : req0;
+                    assert res0 != null : reqState;
 
-                    updateNear(req0, res0);
+                    updateNear(reqState.req, res0);
                 }
             }
             else if (!nodeErr)
                 updateNear(req, res);
         }
 
-        if (remapTopVer != null) {
-            if (fut0 != null)
-                fut0.onDone();
+        if (remapTopVer0 != null) {
+            waitAndRemap(remapTopVer0);
 
-            if (!waitTopFut) {
-                onDone(new GridCacheTryPutFailedException());
+            return;
+        }
 
-                return;
+        if (rcvAll)
+            onDone(opRes0, err0);
+    }
+
+    private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
+        assert remapTopVer != null;
+
+        if (!waitTopFut) {
+            onDone(new GridCacheTryPutFailedException());
+
+            return;
+        }
+
+        if (topLocked) {
+            assert !F.isEmpty(remapKeys) : remapKeys;
+
+            CachePartialUpdateCheckedException e =
+                new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+            ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+            cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+            e.add(remapKeys, cause);
+
+            onDone(e);
+
+            return;
+        }
+
+        IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+        if (fut == null)
+            fut = new GridFinishedFuture<>(remapTopVer);
+
+        fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        mapOnTopology();
+                    }
+                });
             }
+        });
+    }
+
+    /**
+     * @return Non null topology version if update should be remapped.
+     */
+    @Nullable private AffinityTopologyVersion onAllReceived() {
+        assert futId != null;
 
-            if (topLocked) {
-                assert !F.isEmpty(remapKeys) : remapKeys;
+        AffinityTopologyVersion remapTopVer0 = null;
 
-                CachePartialUpdateCheckedException e =
-                    new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+        if (remapKeys != null) {
+            assert remapTopVer != null;
 
-                ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
-                    "Failed to update keys, topology changed while execute atomic update inside transaction.");
+            remapTopVer0 = remapTopVer;
+        }
+        else {
+            if (err != null &&
+                X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                storeFuture() &&
+                --remapCnt > 0) {
+                ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
 
-                cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+                if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                    CachePartialUpdateCheckedException cause =
+                        X.cause(err, CachePartialUpdateCheckedException.class);
 
-                e.add(remapKeys, cause);
+                    assert cause != null && cause.topologyVersion() != null : err;
+                    assert remapKeys == null;
+                    assert remapTopVer == null;
 
-                onDone(e);
+                    remapTopVer = remapTopVer0 =
+                        new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
 
-                return;
+                    err = null;
+
+                    Collection<Object> failedKeys = cause.failedKeys();
+
+                    remapKeys = new ArrayList<>(failedKeys.size());
+
+                    for (Object key : failedKeys)
+                        remapKeys.add(cctx.toCacheKeyObject(key));
+                }
             }
+        }
 
-            IgniteInternalFuture<AffinityTopologyVersion> fut =
-                cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+        if (remapTopVer0 != null) {
+            cctx.mvcc().removeAtomicFuture(futId);
 
-            if (fut == null)
-                fut = new GridFinishedFuture<>(remapTopVer);
+            futId = null;
+            topVer = AffinityTopologyVersion.ZERO;
 
-            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                        @Override public void run() {
-                            mapOnTopology();
-                        }
-                    });
+            remapTopVer = null;
+        }
+
+        return remapTopVer0;
+    }
+
+    /**
+     * @param opRes Operation result.
+     * @param err Operation error.
+     */
+    private void finishUpdateFuture(GridCacheReturn opRes,
+        CachePartialUpdateCheckedException err,
+        @Nullable AffinityTopologyVersion remapTopVer) {
+        if (nearEnabled) {
+            if (mappings != null) {
+                for (PrimaryRequestState reqState : mappings.values()) {
+                    GridNearAtomicUpdateResponse res0 = reqState.req.response();
+
+                    assert res0 != null : reqState;
+
+                    updateNear(reqState.req, res0);
                 }
-            });
+            }
+            else {
+                assert singleReq != null && singleReq.req.response() != null;
+
+                updateNear(singleReq.req, singleReq.req.response());
+            }
+        }
+
+        if (remapTopVer != null) {
+            assert !F.isEmpty(remapKeys);
+
+            waitAndRemap(remapTopVer);
 
             return;
         }
 
-        if (rcvAll)
-            onDone(opRes0, err0);
+        onDone(opRes, err);
     }
 
     /**
@@ -475,10 +627,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @param req Update request.
      * @param res Update response.
      */
-    private void updateNear(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
+    private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
-        if (res.remapKeys() != null || !req.hasPrimary())
+        if (res.remapTopologyVersion() != null)
             return;
 
         GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -489,59 +641,48 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     /** {@inheritDoc} */
     @Override protected void mapOnTopology() {
         AffinityTopologyVersion topVer;
-        GridCacheVersion futVer;
-
-        cache.topology().readLock();
-
-        try {
-            if (cache.topology().stopping()) {
-                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
-                    cache.name()));
 
-                return;
-            }
+        if (cache.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                cache.name()));
 
-            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
-            if (fut.isDone()) {
-                Throwable err = fut.validateCache(cctx);
+            return;
+        }
 
-                if (err != null) {
-                    onDone(err);
+        GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-                    return;
-                }
+        if (fut.isDone()) {
+            Throwable err = fut.validateCache(cctx);
 
-                topVer = fut.topologyVersion();
-
-                futVer = addAtomicFuture(topVer);
-            }
-            else {
-                if (waitTopFut) {
-                    assert !topLocked : this;
-
-                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    mapOnTopology();
-                                }
-                            });
-                        }
-                    });
-                }
-                else
-                    onDone(new GridCacheTryPutFailedException());
+            if (err != null) {
+                onDone(err);
 
                 return;
             }
+
+            topVer = fut.topologyVersion();
         }
-        finally {
-            cache.topology().readUnlock();
+        else {
+            if (waitTopFut) {
+                assert !topLocked : this;
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                mapOnTopology();
+                            }
+                        });
+                    }
+                });
+            }
+            else
+                onDone(new GridCacheTryPutFailedException());
+
+            return;
         }
 
-        if (futVer != null)
-            map(topVer, futVer, remapKeys);
+        map(topVer, remapKeys);
     }
 
     /**
@@ -549,13 +690,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      *
      * @param mappings Mappings to send.
      */
-    private void doUpdate(Map<UUID, GridNearAtomicFullUpdateRequest> mappings) {
+    private void sendUpdateRequests(Map<UUID, PrimaryRequestState> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
-        GridNearAtomicFullUpdateRequest locUpdate = null;
+        GridNearAtomicAbstractUpdateRequest locUpdate = null;
 
         // Send messages to remote nodes first, then run local update.
-        for (GridNearAtomicFullUpdateRequest req : mappings.values()) {
+        for (PrimaryRequestState reqState : mappings.values()) {
+            GridNearAtomicAbstractUpdateRequest req = reqState.req;
+
             if (locNodeId.equals(req.nodeId())) {
                 assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
                     ", req=" + req + ']';
@@ -564,18 +707,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
             else {
                 try {
+                    if (req.initMappingLocally() && reqState.dhtNodes.isEmpty()) {
+                        reqState.dhtNodes = null;
+
+                        req.needPrimaryResponse(true);
+                    }
+
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                     if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() +
-                            ", writeVer=" + req.updateVersion() +
+                        msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
                             ", node=" + req.nodeId() + ']');
                     }
                 }
                 catch (IgniteCheckedException e) {
                     if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() +
-                            ", writeVer=" + req.updateVersion() +
+                        msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
                             ", node=" + req.nodeId() +
                             ", err=" + e + ']');
                     }
@@ -587,9 +734,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (locUpdate != null) {
             cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicFullUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
-                        onResult(res.nodeId(), res, false);
+                new GridDhtAtomicCache.UpdateReplyClosure() {
+                    @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                        if (syncMode != FULL_ASYNC)
+                            onPrimaryResponse(res.nodeId(), res, false);
+                        else if (res.remapTopologyVersion() != null)
+                            ((GridDhtAtomicCache)cctx.cache()).remapToNewPrimary(req);
                     }
                 });
         }
@@ -599,18 +749,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
-        map(topVer, futVer, null);
+    @Override protected void map(AffinityTopologyVersion topVer) {
+        map(topVer, null);
     }
 
     /**
      * @param topVer Topology version.
-     * @param futVer Future ID.
      * @param remapKeys Keys to remap.
      */
-    void map(AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
-        @Nullable Collection<KeyCacheObject> remapKeys) {
+    void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -620,64 +767,45 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        GridCacheVersion updVer;
-
-        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
-            updVer = this.updVer;
-
-            if (updVer == null) {
-                updVer = futVer;
-
-                if (log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near node: " + updVer);
-            }
-        }
-        else
-            updVer = null;
+        Long futId = cctx.mvcc().atomicFutureId();
 
         Exception err = null;
-        GridNearAtomicFullUpdateRequest singleReq0 = null;
-        Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null;
+        PrimaryRequestState singleReq0 = null;
+        Map<UUID, PrimaryRequestState> mappings0 = null;
 
         int size = keys.size();
 
+        boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) &&
+            !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
+
         try {
-            if (size == 1 && !fastMap) {
+            if (size == 1) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
-                singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+                singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown);
             }
             else {
-                Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes,
+                Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
                     topVer,
-                    futVer,
-                    updVer,
-                    remapKeys);
+                    futId,
+                    remapKeys,
+                    mappingKnown);
 
                 if (pendingMappings.size() == 1)
                     singleReq0 = F.firstValue(pendingMappings);
                 else {
-                    if (syncMode == PRIMARY_SYNC) {
-                        mappings0 = U.newHashMap(pendingMappings.size());
-
-                        for (GridNearAtomicFullUpdateRequest req : pendingMappings.values()) {
-                            if (req.hasPrimary())
-                                mappings0.put(req.nodeId(), req);
-                        }
-                    }
-                    else
-                        mappings0 = pendingMappings;
+                    mappings0 = pendingMappings;
 
                     assert !mappings0.isEmpty() || size == 0 : this;
                 }
             }
 
             synchronized (mux) {
-                assert this.futVer == futVer || (this.isDone() && this.error() != null);
-                assert this.topVer == topVer;
+                assert this.futId == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
-                this.updVer = updVer;
+                this.topVer = topVer;
+                this.futId = futId;
 
                 resCnt = 0;
 
@@ -686,6 +814,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                 this.remapKeys = null;
             }
+
+            if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+                assert isDone();
+
+                return;
+            }
         }
         catch (Exception e) {
             err = e;
@@ -699,56 +833,133 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         // Optimize mapping for single key.
         if (singleReq0 != null)
-            mapSingle(singleReq0.nodeId(), singleReq0);
+            sendSingleRequest(singleReq0.req.nodeId(), singleReq0.req);
         else {
             assert mappings0 != null;
 
-            if (size == 0)
+            if (size == 0) {
                 onDone(new GridCacheReturn(cctx, true, true, null, true));
+
+                return;
+            }
             else
-                doUpdate(mappings0);
+                sendUpdateRequests(mappings0);
         }
+
+        if (syncMode == FULL_ASYNC) {
+            onDone(new GridCacheReturn(cctx, true, true, null, true));
+
+            return;
+        }
+
+        if (mappingKnown && syncMode == FULL_SYNC && cctx.discovery().topologyVersion() != topVer.topologyVersion())
+            checkDhtNodes(futId);
     }
 
-    /**
-     * @return Future version.
-     */
-    private GridCacheVersion onFutureDone() {
-        GridCacheVersion ver0;
+    private void checkDhtNodes(Long futId) {
+        GridCacheReturn opRes0 = null;
+        CachePartialUpdateCheckedException err0 = null;
+        AffinityTopologyVersion remapTopVer0 = null;
+
+        List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
 
-        GridFutureAdapter<Void> fut0;
+        boolean rcvAll = false;
 
         synchronized (mux) {
-            fut0 = topCompleteFut;
+            if (this.futId == null || !this.futId.equals(futId))
+                return;
+
+            if (singleReq != null) {
+                if (!singleReq.req.initMappingLocally())
+                    return;
+
+                DhtLeftResult res = singleReq.checkDhtNodes(cctx);
+
+                if (res == DhtLeftResult.DONE) {
+                    opRes0 = opRes;
+                    err0 = err;
+                    remapTopVer0 = onAllReceived();
+                }
+                else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+                    checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req));
+                else
+                    return;
+            }
+            else {
+                if (mappings != null) {
+                    for (PrimaryRequestState reqState : mappings.values()) {
+                        if (!reqState.req.initMappingLocally())
+                            continue;
+
+                        DhtLeftResult res = reqState.checkDhtNodes(cctx);
+
+                        if (res == DhtLeftResult.DONE) {
+                            assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                            resCnt++;
 
-            topCompleteFut = null;
+                            if (mappings.size() == resCnt) {
+                                rcvAll = true;
 
-            ver0 = futVer;
+                                opRes0 = opRes;
+                                err0 = err;
+                                remapTopVer0 = onAllReceived();
 
-            futVer = null;
+                                break;
+                            }
+                        }
+                        else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+                            if (checkReqs == null)
+                                checkReqs = new ArrayList<>(mappings.size());
+
+                            checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req));
+                        }
+                    }
+                }
+                else
+                    return;
+            }
         }
 
-        if (fut0 != null)
-            fut0.onDone();
+        if (checkReqs != null) {
+            assert !rcvAll;
+
+            for (int i = 0; i < checkReqs.size(); i++)
+                sendCheckUpdateRequest(checkReqs.get(i));
+        }
+        else if (rcvAll)
+            finishUpdateFuture(opRes0, err0, remapTopVer0);
+    }
+
+    /**
+     * @return Future version.
+     */
+    private Long onFutureDone() {
+        Long id0;
+
+        synchronized (mux) {
+            id0 = futId;
+
+            futId = null;
+        }
 
-        return ver0;
+        return id0;
     }
 
     /**
      * @param topNodes Cache nodes.
      * @param topVer Topology version.
-     * @param futVer Future version.
-     * @param updVer Update version.
+     * @param futId Future ID.
      * @param remapKeys Keys to remap.
      * @return Mapping.
      * @throws Exception If failed.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+    private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
-        @Nullable GridCacheVersion updVer,
-        @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+        Long futId,
+        @Nullable Collection<KeyCacheObject> remapKeys,
+        boolean mappingKnown) throws Exception {
         Iterator<?> it = null;
 
         if (vals != null)
@@ -764,7 +975,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (conflictRmvVals != null)
             conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+        Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
 
         // Create mappings first, then send messages.
         for (Object key : keys) {
@@ -819,55 +1030,50 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             else
                 val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
 
-            List<ClusterNode> affNodes = mapKey(cacheKey, topVer);
+            List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
 
-            if (affNodes.isEmpty())
+            if (F.isEmpty(nodes))
                 throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                     "(all partition nodes left the grid).");
 
-            int i = 0;
-
-            for (int n = 0; n < affNodes.size(); n++) {
-                ClusterNode affNode = affNodes.get(n);
-
-                if (affNode == null)
-                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                        "(all partition nodes left the grid).");
-
-                UUID nodeId = affNode.id();
-
-                GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId);
-
-                if (mapped == null) {
-                    mapped = new GridNearAtomicFullUpdateRequest(
-                        cctx.cacheId(),
-                        nodeId,
-                        futVer,
-                        fastMap,
-                        updVer,
-                        topVer,
-                        topLocked,
-                        syncMode,
-                        op,
-                        retval,
-                        expiryPlc,
-                        invokeArgs,
-                        filter,
-                        subjId,
-                        taskNameHash,
-                        skipStore,
-                        keepBinary,
-                        cctx.kernalContext().clientNode(),
-                        cctx.deploymentEnabled(),
-                        keys.size());
-
-                    pendingMappings.put(nodeId, mapped);
-                }
+            ClusterNode primary = nodes.get(0);
+
+            boolean needPrimaryRes = !mappingKnown || primary.isLocal();
+
+            UUID nodeId = primary.id();
 
-                mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+            PrimaryRequestState mapped = pendingMappings.get(nodeId);
 
-                i++;
+            if (mapped == null) {
+                GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
+                    cctx.cacheId(),
+                    nodeId,
+                    futId,
+                    topVer,
+                    topLocked,
+                    syncMode,
+                    op,
+                    retval,
+                    expiryPlc,
+                    invokeArgs,
+                    filter,
+                    subjId,
+                    taskNameHash,
+                    needPrimaryRes,
+                    skipStore,
+                    keepBinary,
+                    cctx.deploymentEnabled(),
+                    keys.size());
+
+                mapped = new PrimaryRequestState(req, nodes, false);
+
+                pendingMappings.put(nodeId, mapped);
             }
+
+            if (mapped.req.initMappingLocally())
+                mapped.addMapping(nodes);
+
+            mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
         }
 
         return pendingMappings;
@@ -875,14 +1081,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /**
      * @param topVer Topology version.
-     * @param futVer Future version.
-     * @param updVer Update version.
+     * @param futId Future ID.
+     * @param mappingKnown {@code True} if update mapping is known locally.
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
-        @Nullable GridCacheVersion updVer) throws Exception {
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+        throws Exception {
         Object key = F.first(keys);
 
         Object val;
@@ -935,18 +1140,20 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         else
             val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
 
-        ClusterNode primary = cctx.affinity().primaryByPartition(cacheKey.partition(), topVer);
+        List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
 
-        if (primary == null)
-            throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
-                "left the grid).");
+        if (F.isEmpty(nodes))
+            throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                    "(all partition nodes left the grid).");
+
+        ClusterNode primary = nodes.get(0);
+
+        boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1;
 
         GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
             cctx.cacheId(),
             primary.id(),
-            futVer,
-            fastMap,
-            updVer,
+            futId,
             topVer,
             topLocked,
             syncMode,
@@ -957,9 +1164,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             filter,
             subjId,
             taskNameHash,
+            needPrimaryRes,
             skipStore,
             keepBinary,
-            cctx.kernalContext().clientNode(),
             cctx.deploymentEnabled(),
             1);
 
@@ -967,26 +1174,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             val,
             conflictTtl,
             conflictExpireTime,
-            conflictVer,
-            true);
-
-        return req;
-    }
-
-    /**
-     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
-     * node and send updates in parallel to all participating nodes.
-     *
-     * @param key Key to map.
-     * @param topVer Topology version to map.
-     * @return Collection of nodes to which key is mapped.
-     */
-    private List<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) {
-        GridCacheAffinityManager affMgr = cctx.affinity();
+            conflictVer);
 
-        // If we can send updates in parallel - do it.
-        return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primaryByKey(key, topVer));
+        return new PrimaryRequestState(req, nodes, true);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 22e01ae..4e20fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -23,11 +23,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -59,29 +58,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @GridDirectTransient
     private UUID nodeId;
 
-    /** Future version. */
-    private GridCacheVersion futVer;
+    /** Future ID. */
+    private long futId;
 
-    /** Update error. */
-    @GridDirectTransient
-    private volatile IgniteCheckedException err;
-
-    /** Serialized error. */
-    private byte[] errBytes;
+    /** */
+    private UpdateErrors errs;
 
     /** Return value. */
     @GridToStringInclude
     private GridCacheReturn ret;
 
-    /** Failed keys. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private volatile Collection<KeyCacheObject> failedKeys;
-
-    /** Keys that should be remapped. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> remapKeys;
+    /** */
+    private AffinityTopologyVersion remapTopVer;
 
     /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
     @GridDirectCollection(int.class)
@@ -108,6 +96,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Partition ID. */
     private int partId = -1;
 
+    /** */
+    @GridDirectCollection(UUID.class)
+    @GridToStringInclude
+    private List<UUID> dhtNodes;
+
+    /** */
+    @GridDirectTransient
+    private boolean nodeLeft;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -118,24 +115,52 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /**
      * @param cacheId Cache ID.
      * @param nodeId Node ID this reply should be sent to.
-     * @param futVer Future version.
+     * @param futId Future ID.
+     * @param partId Partition.
+     * @param nodeLeft {@code True} if primary node failed.
      * @param addDepInfo Deployment info flag.
      */
-    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
-        assert futVer != null;
-
+    public GridNearAtomicUpdateResponse(int cacheId,
+        UUID nodeId,
+        long futId,
+        int partId,
+        boolean nodeLeft,
+        boolean addDepInfo) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
-        this.futVer = futVer;
+        this.futId = futId;
+        this.partId = partId;
+        this.nodeLeft = nodeLeft;
         this.addDepInfo = addDepInfo;
     }
 
+    /**
+     * @return {@code True} if primary node failed.
+     */
+    public boolean nodeLeftResponse() {
+        return nodeLeft;
+    }
+
     /** {@inheritDoc} */
     @Override public int lookupIndex() {
         return CACHE_MSG_IDX;
     }
 
     /**
+     * @param dhtNodes DHT nodes.
+     */
+    public void dhtNodes(List<UUID> dhtNodes) {
+        this.dhtNodes = dhtNodes;
+    }
+
+    /**
+     * @return DHT nodes.
+     */
+    @Nullable public List<UUID> dhtNodes() {
+        return dhtNodes;
+    }
+
+    /**
      * @return Node ID this response should be sent to.
      */
     public UUID nodeId() {
@@ -150,17 +175,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
-     * @return Future version.
+     * @return Future ID.
      */
-    public GridCacheVersion futureVersion() {
-        return futVer;
-    }
-
-    /**
-     * @param partId Partition ID for proper striping on near node.
-     */
-    public void partition(int partId) {
-        this.partId = partId;
+    public long futureId() {
+        return futId;
     }
 
     /**
@@ -169,19 +187,22 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param err Error.
      */
     public void error(IgniteCheckedException err){
-        this.err = err;
+        if (errs == null)
+            errs = new UpdateErrors();
+
+        errs.onError(err);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteCheckedException error() {
-        return err;
+        return errs != null ? errs.error() : null;
     }
 
     /**
      * @return Collection of failed keys.
      */
     public Collection<KeyCacheObject> failedKeys() {
-        return failedKeys;
+        return errs != null ? errs.failedKeys() : null;
     }
 
     /**
@@ -200,17 +221,17 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
-     * @param remapKeys Remap keys.
+     * @param remapTopVer Topology version to remap update.
      */
-    public void remapKeys(List<KeyCacheObject> remapKeys) {
-        this.remapKeys = remapKeys;
+    void remapTopologyVersion(AffinityTopologyVersion remapTopVer) {
+        this.remapTopVer = remapTopVer;
     }
 
     /**
-     * @return Remap keys.
+     * @return Topology version if update should be remapped.
      */
-    public Collection<KeyCacheObject> remapKeys() {
-        return remapKeys;
+    @Nullable AffinityTopologyVersion remapTopologyVersion() {
+        return remapTopVer;
     }
 
     /**
@@ -221,7 +242,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param ttl TTL for near cache update.
      * @param expireTime Expire time for near cache update.
      */
-    public void addNearValue(int keyIdx,
+    void addNearValue(int keyIdx,
         @Nullable CacheObject val,
         long ttl,
         long expireTime) {
@@ -242,7 +263,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param expireTime Expire time for near cache update.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+    void addNearTtl(int keyIdx, long ttl, long expireTime) {
         if (ttl >= 0) {
             if (nearTtls == null) {
                 nearTtls = new GridLongList(16);
@@ -299,7 +320,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /**
      * @param nearVer Version generated on primary node to be used for originating node's near cache update.
      */
-    public void nearVersion(GridCacheVersion nearVer) {
+    void nearVersion(GridCacheVersion nearVer) {
         this.nearVer = nearVer;
     }
 
@@ -313,7 +334,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /**
      * @param keyIdx Index of key for which update was skipped
      */
-    public void addSkippedIndex(int keyIdx) {
+    void addSkippedIndex(int keyIdx) {
         if (nearSkipIdxs == null)
             nearSkipIdxs = new ArrayList<>();
 
@@ -351,35 +372,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param e Error cause.
      */
     public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
-        if (failedKeys == null)
-            failedKeys = new ConcurrentLinkedQueue<>();
-
-        failedKeys.add(key);
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
-    }
-
-    /**
-     * Adds keys to collection of failed keys.
-     *
-     * @param keys Key to add.
-     * @param e Error cause.
-     */
-    public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
-        if (keys != null) {
-            if (failedKeys == null)
-                failedKeys = new ArrayList<>(keys.size());
-
-            failedKeys.addAll(keys);
-        }
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
+        if (errs == null)
+            errs = new UpdateErrors();
 
-        err.addSuppressed(e);
+        errs.addFailedKey(key, e);
     }
 
     /**
@@ -387,18 +383,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      *
      * @param keys Key to add.
      * @param e Error cause.
-     * @param ctx Context.
      */
-    public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) {
-        if (failedKeys == null)
-            failedKeys = new ArrayList<>(keys.size());
+    synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+        if (errs == null)
+            errs = new UpdateErrors();
 
-        failedKeys.addAll(keys);
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
+        errs.addFailedKeys(keys, e);
     }
 
     /** {@inheritDoc}
@@ -406,14 +396,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (err != null && errBytes == null)
-            errBytes = U.marshal(ctx, err);
-
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        prepareMarshalCacheObjects(failedKeys, cctx);
-
-        prepareMarshalCacheObjects(remapKeys, cctx);
+        if (errs != null)
+            errs.prepareMarshal(this, cctx);
 
         prepareMarshalCacheObjects(nearVals, cctx);
 
@@ -425,14 +411,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errBytes != null && err == null)
-            err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
-        finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+        if (errs != null)
+            errs.finishUnmarshal(this, cctx, ldr);
 
         finishUnmarshalCacheObjects(nearVals, cctx, ldr);
 
@@ -471,19 +453,19 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeByteArray("errBytes", errBytes))
+                if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("errs", errs))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
@@ -531,7 +513,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("remapTopVer", remapTopVer))
                     return false;
 
                 writer.incrementState();
@@ -559,7 +541,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
         switch (reader.state()) {
             case 3:
-                errBytes = reader.readByteArray("errBytes");
+                dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
                     return false;
@@ -567,7 +549,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 4:
-                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+                errs = reader.readMessage("errs");
 
                 if (!reader.isLastRead())
                     return false;
@@ -575,7 +557,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 5:
-                futVer = reader.readMessage("futVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -639,7 +621,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 13:
-                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+                remapTopVer = reader.readMessage("remapTopVer");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
new file mode 100644
index 0000000..1d415c8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class UpdateErrors implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Failed keys. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> failedKeys;
+
+    /** Update error. */
+    @GridDirectTransient
+    @GridToStringInclude
+    private IgniteCheckedException err;
+
+    /** Serialized update error. */
+    private byte[] errBytes;
+
+    /**
+     *
+     */
+    public UpdateErrors() {
+        // No-op.
+    }
+
+    /**
+     * @param err Error.
+     */
+    public UpdateErrors(IgniteCheckedException err) {
+        assert err != null;
+
+        this.err = err;
+    }
+
+    /**
+     * @param err Error.
+     */
+    public void onError(IgniteCheckedException err){
+        this.err = err;
+    }
+
+    /**
+     * @return Error.
+     */
+    public IgniteCheckedException error() {
+        return err;
+    }
+
+    /**
+     * @return Failed keys.
+     */
+    public Collection<KeyCacheObject> failedKeys() {
+        return failedKeys;
+    }
+
+    /**
+     * Adds key to collection of failed keys.
+     *
+     * @param key Key to add.
+     * @param e Error cause.
+     */
+    void addFailedKey(KeyCacheObject key, Throwable e) {
+        if (failedKeys == null)
+            failedKeys = new ArrayList<>();
+
+        failedKeys.add(key);
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys.");
+
+        err.addSuppressed(e);
+    }
+
+    /**
+     * @param keys Keys.
+     * @param e Error.
+     */
+    void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+        if (failedKeys == null)
+            failedKeys = new ArrayList<>(keys.size());
+
+        failedKeys.addAll(keys);
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
+    /** {@inheritDoc} */
+    void prepareMarshal(GridCacheMessage msg, GridCacheContext cctx) throws IgniteCheckedException {
+        msg.prepareMarshalCacheObjects(failedKeys, cctx);
+
+        if (errBytes == null)
+            errBytes = U.marshal(cctx.marshaller(), err);
+    }
+
+    /** {@inheritDoc} */
+    void finishUnmarshal(GridCacheMessage msg, GridCacheContext cctx, ClassLoader ldr) throws IgniteCheckedException {
+        msg.finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+        if (errBytes != null && err == null)
+            err = U.unmarshal(cctx.marshaller(), errBytes, U.resolveClassLoader(ldr, cctx.gridConfig()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(UpdateErrors.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -46;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(UpdateErrors.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 41632ef..62aecd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.io.Externalizable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -43,7 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
@@ -141,10 +142,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         List<Integer> nearValsIdxs = res.nearValuesIndexes();
         List<Integer> skipped = res.skippedIndexes();
 
-        GridCacheVersion ver = req.updateVersion();
-
-        if (ver == null)
-            ver = res.nearVersion();
+        GridCacheVersion ver = res.nearVersion();
 
         assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
 
@@ -194,7 +192,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 processNearAtomicUpdateResponse(ver,
                     key,
                     val,
-                    null,
                     ttl,
                     expireTime,
                     req.keepBinary(),
@@ -212,7 +209,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
      * @param ver Version.
      * @param key Key.
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param ttl TTL.
      * @param expireTime Expire time.
      * @param nodeId Node ID.
@@ -224,7 +220,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         GridCacheVersion ver,
         KeyCacheObject key,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         long ttl,
         long expireTime,
         boolean keepBinary,
@@ -241,7 +236,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 try {
                     entry = entryEx(key, topVer);
 
-                    GridCacheOperation op = (val != null || valBytes != null) ? UPDATE : DELETE;
+                    GridCacheOperation op = val != null ? UPDATE : DELETE;
 
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
@@ -299,11 +294,12 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
      * @param nodeId Sender node ID.
      * @param req Dht atomic update request.
      * @param res Dht atomic update response.
+     * @return Evicted near keys (if any).
      */
-    public void processDhtAtomicUpdateRequest(
+    @Nullable public List<KeyCacheObject> processDhtAtomicUpdateRequest(
         UUID nodeId,
         GridDhtAtomicAbstractUpdateRequest req,
-        GridDhtAtomicUpdateResponse res
+        GridDhtAtomicNearResponse res
     ) {
         GridCacheVersion ver = req.writeVersion();
 
@@ -313,6 +309,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
+        List<KeyCacheObject> nearEvicted = null;
+
         for (int i = 0; i < req.nearSize(); i++) {
             KeyCacheObject key = req.nearKey(i);
 
@@ -322,7 +320,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         GridCacheEntryEx entry = peekEx(key);
 
                         if (entry == null) {
-                            res.addNearEvicted(key);
+                            if (nearEvicted == null)
+                                nearEvicted = new ArrayList<>();
+
+                            nearEvicted.add(key);
 
                             break;
                         }
@@ -388,6 +389,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 res.addFailedKey(key, new IgniteCheckedException("Failed to update near cache key: " + key, e));
             }
         }
+
+        return nearEvicted;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index b3f0684..485059f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -425,7 +425,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                                     onEntryUpdate(evt, notify, loc, recordIgniteEvt);
                                 }
-                            });
+                            }, sync);
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 5ca3da8..35fbe11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -140,7 +140,10 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
      *        before stored in cache.
      * @return Cache key object.
      */
-    public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj, boolean userObj);
+    public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx,
+        @Nullable GridCacheContext cctx,
+        Object obj,
+        boolean userObj);
 
     /**
      * @param ctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index ff7c4ba..e0549fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -231,8 +231,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
                 cctx.affinity().partition(obj, false) :
                 ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null);
         }
-        catch (IgniteCheckedException ignored) {
-            U.error(log, "Failed to get partition");
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to get partition", e);
 
             return  -1;
         }
@@ -327,13 +327,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
 
         /**
          * @param key Key.
-         */
-        UserKeyCacheObjectImpl(Object key) {
-            this(key, -1);
-        }
-
-        /**
-         * @param key Key.
+         * @param part Partition.
          */
         UserKeyCacheObjectImpl(Object key, int part) {
             super(key, null, part);
@@ -341,6 +335,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
 
         /**
          * @param key Key.
+         * @param valBytes Marshalled key.
+         * @param part Partition.
          */
         UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) {
             super(key, valBytes, part);
@@ -366,10 +362,10 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
 
                     Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
 
-                    return new KeyCacheObjectImpl(val, valBytes);
+                    return new KeyCacheObjectImpl(val, valBytes, partition());
                 }
 
-                return new KeyCacheObjectImpl(val, valBytes);
+                return new KeyCacheObjectImpl(val, valBytes, partition());
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException("Failed to marshal object: " + val, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 53096ab..6c85b32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -56,7 +56,7 @@ public class StripedExecutor implements ExecutorService {
 
     /**
      * @param cnt Count.
-     * @param gridName Node name.
+     * @param igniteInstanceName Node name.
      * @param poolName Pool name.
      * @param log Logger.
      */
@@ -435,7 +435,11 @@ public class StripedExecutor implements ExecutorService {
          * Starts the stripe.
          */
         void start() {
-            thread = new IgniteThread(igniteInstanceName, poolName + "-stripe-" + idx, this);
+            thread = new IgniteThread(igniteInstanceName,
+                poolName + "-stripe-" + idx,
+                this,
+                IgniteThread.GRP_IDX_UNASSIGNED,
+                idx);
 
             thread.start();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 7abd367..96f3797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.util.future;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -118,7 +120,14 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
         }
         catch (IgniteCheckedException e) {
             if (!ignoreFailure(e)) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
+                if (e instanceof NodeStoppingException) {
+                    IgniteLogger log = logger();
+
+                    if (log != null && log.isDebugEnabled())
+                        log.debug("Failed to execute compound future reducer, node stopped.");
+                }
+                else
+                    U.error(null, "Failed to execute compound future reducer: " + this, e);
 
                 onDone(e);
             }


Mime
View raw message