ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4705
Date Mon, 27 Feb 2017 14:37:05 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4705 9bfbedb7a -> d5b7234e8


ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5b7234e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5b7234e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5b7234e

Branch: refs/heads/ignite-4705
Commit: d5b7234e86ae96b1398d5ea8b19081c5dfaaa139
Parents: 9bfbedb
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Feb 27 15:39:34 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Feb 27 17:33:53 2017 +0300

----------------------------------------------------------------------
 .../GridDhtAtomicAbstractUpdateFuture.java      |  17 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  13 +-
 .../dht/atomic/GridDhtAtomicNearResponse.java   |   9 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |  23 +-
 .../GridNearAtomicSingleUpdateFuture.java       | 174 +++++++------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 212 +++++++++-------
 .../atomic/GridNearAtomicUpdateResponse.java    |  27 +-
 ...niteCacheClientNodeChangingTopologyTest.java |   3 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |   1 +
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 246 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite5.java       |   3 +
 11 files changed, 529 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 68f8abf..eb3ed04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -335,9 +335,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     return false;
             }
 
-            if (resCnt0 == mappings.size())
-                onDone();
-
             if (needReplyToNear) {
                 assert !F.isEmpty(mappings);
 
@@ -347,7 +344,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
                 GridDhtAtomicNearResponse res = new GridDhtAtomicNearResponse(cctx.cacheId(),
                     req.partition(),
-                    req.futureId(),
+                    req.nearFutureId(),
                     cctx.localNodeId(),
                     dhtNodes,
                     req.flags());
@@ -383,6 +380,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 }
             }
 
+            if (resCnt0 == mappings.size())
+                onDone();
+
             return true;
         }
 
@@ -400,11 +400,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         GridDhtAtomicCache.UpdateReplyClosure completionCb,
         GridCacheReturn ret) {
         boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
+
         boolean needReplyToNear = repliedToNear =
             updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
             ret.hasValue() ||
             updateRes.nearVersion() != null ||
-            updateReq.nodeId().equals(cctx.localNodeId());
+            updateRes.nodeId().equals(cctx.localNodeId());
 
         List<UUID> dhtNodes = null;
 
@@ -439,7 +440,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     }
                     catch (IgniteCheckedException e) {
                         U.error(msgLog, "Failed to send mapping response [futId=" + futId +
-                            ", writeVer=" + writeVer + ", node=" + updateRes.nodeId() + ']');
+                            ", writeVer=" + writeVer +
+                            ", node=" + updateRes.nodeId() + ']');
                     }
                 }
             }
@@ -498,6 +500,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      */
     public final void onDhtErrorResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
         // TODO IGNITE-4705.
+        assert false;
     }
 
     /**
@@ -570,7 +573,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
             Map<UUID, String> dhtRes = F.viewReadOnly(mappings,
                 new IgniteClosure<GridDhtAtomicAbstractUpdateRequest, String>() {
                     @Override public String apply(GridDhtAtomicAbstractUpdateRequest req) {
-                        return "[res" + req.hasResponse() +
+                        return "[res=" + req.hasResponse() +
                             ", size=" + req.size() +
                             ", nearSize=" + req.nearSize() + ']';
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index a0a6e4f..2f81b72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -143,7 +143,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /** */
     private static final boolean IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK =
-        IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK", false);
+        IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK", true);
 
     /** */
     private final ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>> defRes =
@@ -227,7 +227,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 if (req.writeSynchronizationMode() != FULL_ASYNC)
                     sendNearUpdateReply(res.nodeId(), res);
                 else {
-                    if (!F.isEmpty(res.remapKeys()))
+                    if (res.remapTopologyVersion() != null)
                         // Remap keys on primary node in FULL_ASYNC mode.
                         remapToNewPrimary(req);
                     else if (res.error() != null) {
@@ -1885,9 +1885,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (dhtFut != null)
                             ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
                     }
-                    else
+                    else {
                         // Should remap all keys.
                         remap = true;
+
+                        res.remapTopologyVersion(top.topologyVersion());
+                    }
                 }
                 finally {
                     top.readUnlock();
@@ -1938,8 +1941,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (remap) {
             assert dhtFut == null;
 
-            res.remapKeys(req.keys());
-
             completionCb.apply(req, res);
         }
         else {
@@ -3616,7 +3617,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             updateFut.onDhtResponse(nodeId, res);
         }
         else {
-            U.warn(msgLog, "Failed to find update future DHT atomic near response [futId=" + res.futureId() +
+            U.warn(msgLog, "Failed to find update future for DHT atomic near response [futId=" + res.futureId() +
                 ", node=" + nodeId +
                 ", res=" + res + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index 7c2f9fe..25a64f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -26,6 +26,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -56,9 +58,11 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
     /** */
     @GridDirectCollection(UUID.class)
+    @GridToStringInclude
     private List<UUID> mapping;
 
     /** */
+    @GridToStringExclude
     private byte flags;
 
     /** */
@@ -365,6 +369,9 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtAtomicNearResponse.class, this);
+        return S.toString(GridDhtAtomicNearResponse.class, this, "flags",
+            "res=" + isFlag(DHT_ATOMIC_HAS_RESULT_MASK) +
+            "|resOk=" + isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK) +
+            "|dhtFail=" + isFlag(DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 258a0bb..3961a30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -111,21 +112,23 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     protected boolean topLocked;
 
     /** Remap count. */
+    @GridToStringInclude
     protected int remapCnt;
 
     /** Current topology version. */
+    @GridToStringInclude
     protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
-    /** Topology version when got mapping error. */
-    protected AffinityTopologyVersion mapErrTopVer;
-
     /** */
-    protected int resCnt;
+    @GridToStringInclude
+    protected AffinityTopologyVersion remapTopVer;
 
     /** Error. */
+    @GridToStringInclude
     protected CachePartialUpdateCheckedException err;
 
     /** Future ID. */
+    @GridToStringInclude
     protected Long futId;
 
     /** Operation result. */
@@ -363,6 +366,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         private Set<UUID> rcvd;
 
         /** */
+        @GridToStringInclude
         private Set<UUID> mapping;
 
         /** */
@@ -486,7 +490,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
             assert onRes;
 
-            if (res.error() != null || res.remapKeys() != null)
+            if (res.error() != null || res.remapTopologyVersion() != null)
                 return true;
 
             assert res.returnValue() != null : res;
@@ -501,7 +505,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(PrimaryRequestState.class, this);
+            return S.toString(PrimaryRequestState.class, this,
+                "node", req.nodeId(),
+                "rcvdRes", req.response() != null);
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicAbstractUpdateFuture.class, this, super.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 3456329..6a83c6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -122,11 +121,10 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         GridNearAtomicAbstractUpdateRequest req;
-
         GridNearAtomicUpdateResponse res = null;
-
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
+        AffinityTopologyVersion remapTopVer0 = null;
 
         synchronized (mux) {
             if (reqState == null)
@@ -151,6 +149,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 if (reqState.onNodeLeft(nodeId)) {
                     opRes0 = opRes;
                     err0 = err;
+                    remapTopVer0 = onAllReceived();
                 }
                 else
                     return false;
@@ -167,7 +166,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             onPrimaryResponse(nodeId, res, true);
         }
         else
-            finishUpdateFuture(opRes0, err0);
+            finishUpdateFuture(opRes0, err0, remapTopVer0);
 
         return false;
     }
@@ -206,6 +205,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
         GridCacheReturn opRes0;
         CachePartialUpdateCheckedException err0;
+        AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
             if (futId == null || futId != res.futureId())
@@ -216,18 +216,20 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (reqState.onMappingReceived(cctx, res)) {
                 opRes0 = opRes;
                 err0 = err;
+                remapTopVer0 = onAllReceived();
             }
             else
                 return;
         }
 
-        finishUpdateFuture(opRes0, err0);
+        finishUpdateFuture(opRes0, err0, remapTopVer0);
     }
 
     /** {@inheritDoc} */
     @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
         GridCacheReturn opRes0;
         CachePartialUpdateCheckedException err0;
+        AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
             if (futId == null || futId != res.futureId())
@@ -242,12 +244,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (reqState.onDhtResponse(cctx, nodeId, res)) {
                 opRes0 = opRes;
                 err0 = err;
+                remapTopVer0 = onAllReceived();
             }
             else
                 return;
         }
 
-        finishUpdateFuture(opRes0, err0);
+        finishUpdateFuture(opRes0, err0, remapTopVer0);
     }
 
     /** {@inheritDoc} */
@@ -255,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
         GridNearAtomicAbstractUpdateRequest req;
 
-        AffinityTopologyVersion remapTopVer = null;
+        AffinityTopologyVersion remapTopVer0 = null;
 
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
@@ -269,11 +272,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (req == null)
                 return;
 
-            boolean remapKey = !F.isEmpty(res.remapKeys());
+            boolean remapKey = res.remapTopologyVersion() != null;
 
             if (remapKey) {
-                if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
-                    mapErrTopVer = req.topologyVersion();
+                assert !req.topologyVersion().equals(res.remapTopologyVersion());
+
+                assert remapTopVer == null : remapTopVer;
+
+                remapTopVer = remapTopVer0 = res.remapTopologyVersion();
             }
             else if (res.error() != null) {
                 // TODO IGNITE-4705: assert only 1 key?
@@ -314,44 +320,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     return;
             }
 
-            if (remapKey) {
-                assert mapErrTopVer != null;
-
-                remapTopVer = cctx.shared().exchange().topologyVersion();
-            }
-            else {
-                if (err != null &&
-                    X.hasCause(err, CachePartialUpdateCheckedException.class) &&
-                    X.hasCause(err, ClusterTopologyCheckedException.class) &&
-                    storeFuture() &&
-                    --remapCnt > 0) {
-                    ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
-
-                    if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
-                        CachePartialUpdateCheckedException cause =
-                            X.cause(err, CachePartialUpdateCheckedException.class);
-
-                        assert cause != null && cause.topologyVersion() != null : err;
-
-                        remapTopVer =
-                            new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
-
-                        err = null;
-                    }
-                }
-            }
+            remapTopVer0 = onAllReceived();
 
-            if (remapTopVer == null) {
+            if (remapTopVer0 == null) {
                 err0 = err;
                 opRes0 = opRes;
             }
-            else {
-                cctx.mvcc().removeAtomicFuture(futId);
-
-                reqState = null;
-                futId = null;
-                topVer = AffinityTopologyVersion.ZERO;
-            }
         }
 
         if (res.error() != null && res.failedKeys() == null) {
@@ -363,49 +337,91 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         if (nearEnabled && !nodeErr)
             updateNear(req, res);
 
-        if (remapTopVer != null) {
-            if (!waitTopFut) {
-                onDone(new GridCacheTryPutFailedException());
+        if (remapTopVer0 != null) {
+            waitAndRemap(remapTopVer0);
 
-                return;
-            }
+            return;
+        }
 
-            if (topLocked) {
-                CachePartialUpdateCheckedException e =
-                    new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+        onDone(opRes0, err0);
+    }
 
-                ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
-                    "Failed to update keys, topology changed while execute atomic update inside transaction.");
+    private AffinityTopologyVersion onAllReceived() {
+        AffinityTopologyVersion remapTopVer0 = null;
 
-                cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+        if (remapTopVer == null) {
+            if (err != null &&
+                X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                storeFuture() &&
+                --remapCnt > 0) {
+                ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
 
-                e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause);
+                if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                    CachePartialUpdateCheckedException cause =
+                        X.cause(err, CachePartialUpdateCheckedException.class);
 
-                onDone(e);
+                    assert cause != null && cause.topologyVersion() != null : err;
 
-                return;
+                    remapTopVer0 = new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+                    err = null;
+                }
             }
+        }
+        else
+            remapTopVer0 = remapTopVer;
 
-            IgniteInternalFuture<AffinityTopologyVersion> fut =
-                cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+        if (remapTopVer0 != null) {
+            cctx.mvcc().removeAtomicFuture(futId);
 
-            if (fut == null)
-                fut = new GridFinishedFuture<>(remapTopVer);
+            reqState = null;
+            futId = null;
+            topVer = AffinityTopologyVersion.ZERO;
 
-            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                        @Override public void run() {
-                            mapOnTopology();
-                        }
-                    });
-                }
-            });
+            remapTopVer = null;
+        }
+
+        return remapTopVer0;
+    }
+
+    private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
+        if (!waitTopFut) {
+            onDone(new GridCacheTryPutFailedException());
 
             return;
         }
 
-        onDone(opRes0, err0);
+        if (topLocked) {
+            CachePartialUpdateCheckedException e =
+                new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+            ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+            cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+            e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause);
+
+            onDone(e);
+
+            return;
+        }
+
+        IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+        if (fut == null)
+            fut = new GridFinishedFuture<>(remapTopVer);
+
+        fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        mapOnTopology();
+                    }
+                });
+            }
+        });
     }
 
     /**
@@ -417,7 +433,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
-        if (res.remapKeys() != null || !req.hasPrimary())
+        if (res.remapTopologyVersion() != null || !req.hasPrimary())
             return;
 
         GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -496,8 +512,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
                 assert this.topVer == topVer;
 
-                resCnt = 0;
-
                 reqState = new PrimaryRequestState(singleReq0);
             }
         }
@@ -662,13 +676,21 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param opRes Operation result.
      * @param err Operation error.
      */
-    private void finishUpdateFuture(GridCacheReturn opRes, CachePartialUpdateCheckedException err) {
+    private void finishUpdateFuture(GridCacheReturn opRes,
+        CachePartialUpdateCheckedException err,
+        @Nullable AffinityTopologyVersion remapTopVer) {
         if (nearEnabled) {
             assert reqState.req.response() != null;
 
             updateNear(reqState.req, reqState.req.response());
         }
 
+        if (remapTopVer != null) {
+            waitAndRemap(remapTopVer);
+
+            return;
+        }
+
         onDone(opRes, err);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7e9fa05..3d72c99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -81,11 +81,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     private Map<UUID, PrimaryRequestState> mappings;
 
     /** Keys to remap. */
+    @GridToStringInclude
     private Collection<KeyCacheObject> remapKeys;
 
     /** Not null is operation is mapped to single node. */
+    @GridToStringInclude
     private PrimaryRequestState singleReq;
 
+    /** */
+    private int resCnt;
+
     /**
      * @param cctx Cache context.
      * @param cache Cache instance.
@@ -153,10 +158,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override public boolean onNodeLeft(UUID nodeId) {
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
-
         GridNearAtomicUpdateResponse res = null;
-
         GridNearAtomicAbstractUpdateRequest req;
+        AffinityTopologyVersion remapTopVer0 = null;
 
         synchronized (mux) {
             if (singleReq != null) {
@@ -166,6 +170,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     if (singleReq.onNodeLeft(nodeId)) {
                         opRes0 = opRes;
                         err0 = err;
+                        remapTopVer0 = onAllReceived();
                     }
                     else
                         return false;
@@ -194,6 +199,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                                     rcvAll = true;
 
+                                    remapTopVer0 = onAllReceived();
+
                                     break;
                                 }
                             }
@@ -220,7 +227,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             onPrimaryResponse(nodeId, res, true);
         }
         else
-            finishUpdateFuture(opRes0, err0);
+            finishUpdateFuture(opRes0, err0, remapTopVer0);
 
         return false;
     }
@@ -283,6 +290,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
         GridCacheReturn opRes0;
         CachePartialUpdateCheckedException err0;
+        AffinityTopologyVersion remapTopVer0 = null;
 
         synchronized (mux) {
             if (futId == null || futId != res.futureId())
@@ -294,6 +302,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 if (singleReq.onMappingReceived(cctx, res)) {
                     opRes0 = opRes;
                     err0 = err;
+                    remapTopVer0 = onAllReceived();
                 }
                 else
                     return;
@@ -309,6 +318,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     if (mappings.size() == resCnt) {
                         opRes0 = opRes;
                         err0 = err;
+                        remapTopVer0 = onAllReceived();
                     }
                     else
                         return;
@@ -318,13 +328,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
         }
 
-        finishUpdateFuture(opRes0, err0);
+        finishUpdateFuture(opRes0, err0, remapTopVer0);
     }
 
     /** {@inheritDoc} */
     @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
         GridCacheReturn opRes0;
         CachePartialUpdateCheckedException err0;
+        AffinityTopologyVersion remapTopVer0 = null;
 
         synchronized (mux) {
             if (futId == null || futId != res.futureId())
@@ -341,6 +352,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 if (singleReq.onDhtResponse(cctx, nodeId, res)) {
                     opRes0 = opRes;
                     err0 = err;
+                    remapTopVer0 = onAllReceived();
                 }
                 else
                     return;
@@ -360,6 +372,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                         if (mappings.size() == resCnt) {
                             opRes0 = opRes;
                             err0 = err;
+                            remapTopVer0 = onAllReceived();
                         }
                         else
                             return;
@@ -372,7 +385,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
         }
 
-        finishUpdateFuture(opRes0, err0);
+        finishUpdateFuture(opRes0, err0, remapTopVer0);
     }
 
     /** {@inheritDoc} */
@@ -380,7 +393,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
         GridNearAtomicAbstractUpdateRequest req;
 
-        AffinityTopologyVersion remapTopVer = null;
+        AffinityTopologyVersion remapTopVer0 = null;
 
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
@@ -427,14 +440,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
             assert req != null && req.topologyVersion().equals(topVer) : req;
 
-            if (res.remapKeys() != null) {
+            if (res.remapTopologyVersion() != null) {
+                assert !req.topologyVersion().equals(res.remapTopologyVersion());
+
                 if (remapKeys == null)
-                    remapKeys = U.newHashSet(res.remapKeys().size());
+                    remapKeys = U.newHashSet(req.size());
 
-                remapKeys.addAll(res.remapKeys());
+                remapKeys.addAll(req.keys());
 
-                if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
-                    mapErrTopVer = req.topologyVersion();
+                if (remapTopVer == null || remapTopVer.compareTo(res.remapTopologyVersion()) < 0)
+                    remapTopVer = req.topologyVersion();
             }
             else if (res.error() != null) {
                 if (res.failedKeys() != null) {
@@ -470,50 +485,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             if (rcvAll) {
-                if (remapKeys != null) {
-                    assert mapErrTopVer != null;
-
-                    remapTopVer = cctx.shared().exchange().topologyVersion();
-                }
-                else {
-                    if (err != null &&
-                        X.hasCause(err, CachePartialUpdateCheckedException.class) &&
-                        X.hasCause(err, ClusterTopologyCheckedException.class) &&
-                        storeFuture() &&
-                        --remapCnt > 0) {
-                        ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
+                remapTopVer0 = onAllReceived();
 
-                        if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
-                            CachePartialUpdateCheckedException cause =
-                                X.cause(err, CachePartialUpdateCheckedException.class);
-
-                            assert cause != null && cause.topologyVersion() != null : err;
-
-                            remapTopVer =
-                                new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
-
-                            err = null;
-
-                            Collection<Object> failedKeys = cause.failedKeys();
-
-                            remapKeys = new ArrayList<>(failedKeys.size());
-
-                            for (Object key : failedKeys)
-                                remapKeys.add(cctx.toCacheKeyObject(key));
-                        }
-                    }
-                }
-
-                if (remapTopVer == null) {
+                if (remapTopVer0 == null) {
                     err0 = err;
                     opRes0 = opRes;
                 }
-                else {
-                    cctx.mvcc().removeAtomicFuture(futId);
-
-                    futId = null;
-                    topVer = AffinityTopologyVersion.ZERO;
-                }
             }
         }
 
@@ -537,60 +514,117 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 updateNear(req, res);
         }
 
-        if (remapTopVer != null) {
-            if (!waitTopFut) {
-                onDone(new GridCacheTryPutFailedException());
+        if (remapTopVer0 != null) {
+            waitAndRemap(remapTopVer0);
 
-                return;
-            }
+            return;
+        }
 
-            if (topLocked) {
-                assert !F.isEmpty(remapKeys) : remapKeys;
+        if (rcvAll)
+            onDone(opRes0, err0);
+    }
 
-                CachePartialUpdateCheckedException e =
-                    new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+    private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
+        assert remapTopVer != null;
 
-                ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
-                    "Failed to update keys, topology changed while execute atomic update inside transaction.");
+        if (!waitTopFut) {
+            onDone(new GridCacheTryPutFailedException());
 
-                cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+            return;
+        }
 
-                e.add(remapKeys, cause);
+        if (topLocked) {
+            assert !F.isEmpty(remapKeys) : remapKeys;
 
-                onDone(e);
+            CachePartialUpdateCheckedException e =
+                new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
 
-                return;
-            }
+            ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                "Failed to update keys, topology changed while execute atomic update inside transaction.");
 
-            IgniteInternalFuture<AffinityTopologyVersion> fut =
-                cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+            cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
 
-            if (fut == null)
-                fut = new GridFinishedFuture<>(remapTopVer);
+            e.add(remapKeys, cause);
 
-            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                        @Override public void run() {
-                            mapOnTopology();
-                        }
-                    });
-                }
-            });
+            onDone(e);
 
             return;
         }
 
-        if (rcvAll)
-            onDone(opRes0, err0);
+        IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+        if (fut == null)
+            fut = new GridFinishedFuture<>(remapTopVer);
+
+        fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        mapOnTopology();
+                    }
+                });
+            }
+        });
     }
 
+    private AffinityTopologyVersion onAllReceived() {
+        AffinityTopologyVersion remapTopVer0 = null;
+
+        if (remapKeys != null) {
+            assert remapTopVer != null;
+
+            remapTopVer0 = remapTopVer;
+        }
+        else {
+            if (err != null &&
+                X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                storeFuture() &&
+                --remapCnt > 0) {
+                ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
+
+                if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                    CachePartialUpdateCheckedException cause =
+                        X.cause(err, CachePartialUpdateCheckedException.class);
+
+                    assert cause != null && cause.topologyVersion() != null : err;
+                    assert remapKeys == null;
+                    assert remapTopVer == null;
+
+                    remapTopVer = remapTopVer0 =
+                        new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+                    err = null;
+
+                    Collection<Object> failedKeys = cause.failedKeys();
+
+                    remapKeys = new ArrayList<>(failedKeys.size());
+
+                    for (Object key : failedKeys)
+                        remapKeys.add(cctx.toCacheKeyObject(key));
+                }
+            }
+        }
+
+        if (remapTopVer0 != null) {
+            cctx.mvcc().removeAtomicFuture(futId);
+
+            futId = null;
+            topVer = AffinityTopologyVersion.ZERO;
+
+            remapTopVer = null;
+        }
+
+        return remapTopVer0;
+    }
 
     /**
      * @param opRes Operation result.
      * @param err Operation error.
      */
-    private void finishUpdateFuture(GridCacheReturn opRes, CachePartialUpdateCheckedException err) {
+    private void finishUpdateFuture(GridCacheReturn opRes,
+        CachePartialUpdateCheckedException err,
+        @Nullable AffinityTopologyVersion remapTopVer) {
         if (nearEnabled) {
             if (mappings != null) {
                 for (PrimaryRequestState reqState : mappings.values()) {
@@ -608,6 +642,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
         }
 
+        if (remapTopVer != null) {
+            assert !F.isEmpty(remapKeys);
+
+            waitAndRemap(remapTopVer);
+
+            return;
+        }
+
         onDone(opRes, err);
     }
 
@@ -620,7 +662,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
-        if (res.remapKeys() != null)
+        if (res.remapTopologyVersion() != null)
             return;
 
         GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -797,8 +839,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
                 assert this.topVer == topVer;
 
-//                this.updVer = updVer;
-
                 resCnt = 0;
 
                 singleReq = singleReq0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 1fabead..b134c53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -78,10 +79,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @GridDirectCollection(KeyCacheObject.class)
     private volatile Collection<KeyCacheObject> failedKeys;
 
-    /** Keys that should be remapped. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> remapKeys;
+    /** */
+    private AffinityTopologyVersion remapTopVer;
 
     /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
     @GridDirectCollection(int.class)
@@ -216,17 +215,17 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
-     * @param remapKeys Remap keys.
+     * @param remapTopVer Topology version to remap update.
      */
-    public void remapKeys(List<KeyCacheObject> remapKeys) {
-        this.remapKeys = remapKeys;
+    public void remapTopologyVersion(AffinityTopologyVersion remapTopVer) {
+        this.remapTopVer = remapTopVer;
     }
 
     /**
-     * @return Remap keys.
+     * @return Topology version if update should be remapped.
      */
-    public Collection<KeyCacheObject> remapKeys() {
-        return remapKeys;
+    @Nullable public AffinityTopologyVersion remapTopologyVersion() {
+        return remapTopVer;
     }
 
     /**
@@ -429,8 +428,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
         prepareMarshalCacheObjects(failedKeys, cctx);
 
-        prepareMarshalCacheObjects(remapKeys, cctx);
-
         prepareMarshalCacheObjects(nearVals, cctx);
 
         if (ret != null)
@@ -448,8 +445,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
         finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
 
-        finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
-
         finishUnmarshalCacheObjects(nearVals, cctx, ldr);
 
         if (ret != null)
@@ -553,7 +548,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("remapTopVer", remapTopVer))
                     return false;
 
                 writer.incrementState();
@@ -669,7 +664,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 14:
-                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+                remapTopVer = reader.readMessage("remapTopVer");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 16c31d5..e3c45be 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -132,6 +132,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
         TestCommunicationSpi commSpi = new TestCommunicationSpi();
 
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setIdleConnectionTimeout(1000);
 
         cfg.setCommunicationSpi(commSpi);
 
@@ -1693,7 +1694,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testAtomicPrimaryPutAllMultinode() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1685");
+        //fail("https://issues.apache.org/jira/browse/IGNITE-1685");
 
         multinode(PRIMARY, TestType.PUT_ALL);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 096d631..a751e85 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -120,6 +120,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(1000);
 
         AtomicConfiguration acfg = new AtomicConfiguration();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
new file mode 100644
index 0000000..5b084d9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String TEST_CACHE = "testCache";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+        commSpi.setIdleConnectionTimeout(1000);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllPrimaryFailure1() throws Exception {
+        putAllPrimaryFailure(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllPrimaryFailure2() throws Exception {
+        putAllPrimaryFailure(true, true);
+    }
+
+    /**
+     * @param fail0 Fail node 0 flag.
+     * @param fail1 Fail node 1 flag.
+     * @throws Exception If failed.
+     */
+    private void putAllPrimaryFailure(boolean fail0, boolean fail1) throws Exception {
+        startGrids(4);
+
+        client = true;
+
+        Ignite client = startGrid(4);
+
+        IgniteCache<Integer, Integer> nearCache = client.createCache(cacheConfiguration(1));
+        IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+        awaitPartitionMapExchange();
+
+        Ignite srv0 = ignite(0);
+        Ignite srv1 = ignite(1);
+
+        Integer key1 = primaryKey(srv0.cache(TEST_CACHE));
+        Integer key2 = primaryKey(srv1.cache(TEST_CACHE));
+
+        Map<Integer, Integer> map = new HashMap<>();
+        map.put(key1, key1);
+        map.put(key2, key2);
+
+        assertEquals(2, map.size());
+
+        if (fail0)
+            testSpi(client).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name());
+        if (fail1)
+            testSpi(client).blockMessages(GridNearAtomicFullUpdateRequest.class, srv1.name());
+
+        log.info("Start put [key1=" + key1 + ", key2=" + key2 + ']');
+
+        nearAsyncCache.putAll(map);
+
+        IgniteFuture<?> fut = nearAsyncCache.future();
+
+        U.sleep(500);
+
+        assertFalse(fut.isDone());
+
+        if (fail0)
+            stopGrid(0);
+        if (fail1)
+            stopGrid(1);
+
+        fut.get();
+
+        checkData(map);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllBackupFailure1() throws Exception {
+        startGrids(4);
+
+        client = true;
+
+        Ignite client = startGrid(4);
+
+        IgniteCache<Integer, Integer> nearCache = client.createCache(cacheConfiguration(1));
+        IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+        awaitPartitionMapExchange();
+
+        Ignite srv0 = ignite(0);
+
+        List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), 3);
+
+        Ignite backup = backup(client.affinity(TEST_CACHE), keys.get(0));
+
+        testSpi(backup).blockMessages(GridDhtAtomicNearResponse.class, client.name());
+
+        Map<Integer, Integer> map = new HashMap<>();
+
+        for (Integer key : keys)
+            map.put(key, key);
+
+        log.info("Start put [map=" + map + ']');
+
+        nearAsyncCache.putAll(map);
+
+        IgniteFuture<?> fut = nearAsyncCache.future();
+
+        U.sleep(500);
+
+        assertFalse(fut.isDone());
+
+        stopGrid(backup.name());
+
+        fut.get();
+
+        checkData(map);
+    }
+
+    /**
+     * @param expData Expected cache data.
+     */
+    private void checkData(Map<Integer, Integer> expData) {
+        List<Ignite> nodes = G.allGrids();
+
+        assertFalse(nodes.isEmpty());
+
+        for (Ignite node : nodes) {
+            IgniteCache<Integer, Integer> cache = node.cache(TEST_CACHE);
+
+            for (Map.Entry<Integer, Integer> e : expData.entrySet())
+                assertEquals(e.getValue(), cache.get(e.getKey()));
+        }
+    }
+
+    /**
+     * @param aff Affinity.
+     * @param key Key.
+     * @return Backup node for given key.
+     */
+    private Ignite backup(Affinity<Object> aff, Object key) {
+        for (Ignite ignite : G.allGrids()) {
+            ClusterNode node = ignite.cluster().localNode();
+
+            if (aff.isPrimaryOrBackup(node, key) && !aff.isPrimary(node, key))
+                return ignite;
+        }
+
+        fail("Failed to find backup for key: " + key);
+
+        return null;
+    }
+
+    /**
+     * @param node Node.
+     * @return Node communication SPI.
+     */
+    private TestRecordingCommunicationSpi testSpi(Ignite node) {
+        return (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi();
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(TEST_CACHE);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b7234e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 144aac6..1ef5b84 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinity
 import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCacheAtomicProtocolTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest;
 import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest;
 
@@ -65,6 +66,8 @@ public class IgniteCacheTestSuite5 extends TestSuite {
 
         suite.addTestSuite(CacheRebalancingSelfTest.class);
 
+        suite.addTestSuite(IgniteCacheAtomicProtocolTest.class);
+
         return suite;
     }
 }


Mime
View raw message