ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [02/18] ignite git commit: ignite-1124 Changed synchronization in GridNearAtomicUpdateFuture to avoid races, added tests for retries
Date Mon, 31 Aug 2015 14:10:55 GMT
ignite-1124 Changed synchronization in GridNearAtomicUpdateFuture to avoid races, added tests for retries


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8b625a3a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8b625a3a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8b625a3a

Branch: refs/heads/ignite-843
Commit: 8b625a3a9ac602f8374b74a46ff70c88ef1c9014
Parents: 9f7dc50
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Aug 31 09:14:28 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Aug 31 09:14:28 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |    5 -
 .../processors/cache/GridCacheMvccManager.java  |   62 +-
 .../dht/atomic/GridDhtAtomicCache.java          |    6 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |    7 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 1235 +++++++++---------
 .../util/future/GridCompoundFuture.java         |    3 +-
 .../apache/ignite/internal/util/typedef/X.java  |    1 +
 ...eCacheEntryListenerEagerTtlDisabledTest.java |    4 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  282 +++-
 .../dht/IgniteCachePutRetryAtomicSelfTest.java  |   77 +-
 ...gniteCachePutRetryTransactionalSelfTest.java |   14 +-
 ...acheAtomicReplicatedNodeRestartSelfTest.java |    5 -
 .../tcp/IgniteCacheSslStartStopSelfTest.java    |    9 +-
 13 files changed, 948 insertions(+), 762 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8724d3a..e64a9e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,11 +27,6 @@ import java.util.*;
  */
 public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
-     * @return Future topology version.
-     */
-    public AffinityTopologyVersion topologyVersion();
-
-    /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.
      *
      * @param topVer Topology version to finish.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 6a8c6fe..bbac42b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -196,8 +196,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
 
                     cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
-                    if (cacheFut.isCancelled() || cacheFut.isDone())
-                        atomicFuts.remove(cacheFut.futureId(), fut);
+                    if (cacheFut.isCancelled() || cacheFut.isDone()) {
+                        GridCacheVersion futVer = cacheFut.version();
+
+                        if (futVer != null)
+                            atomicFuts.remove(futVer, fut);
+                    }
                 }
             }
         }
@@ -347,16 +351,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param fut Future to check.
-     * @return {@code True} if future is registered.
-     */
-    public boolean hasFuture(GridCacheFuture<?> fut) {
-        assert fut != null;
-
-        return future(fut.version(), fut.futureId()) != null;
-    }
-
-    /**
      * @param futVer Future ID.
      * @param fut Future.
      */
@@ -565,6 +559,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param ver Version.
      * @return All futures for given lock version.
      */
+    @SuppressWarnings("unchecked")
     public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) {
         Collection c = futs.get(ver);
 
@@ -572,6 +567,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param ver Lock version to check.
      * @return {@code True} if lock had been removed.
      */
@@ -580,6 +576,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param ver Obsolete entry version.
      * @return {@code True} if added.
      */
@@ -688,27 +685,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param keys Keys.
-     * @param base Base version.
-     * @return Versions that are less than {@code base} whose keys are in the {@code keys} collection.
-     */
-    public Collection<GridCacheVersion> localDhtPendingVersions(Collection<KeyCacheObject> keys, GridCacheVersion base) {
-        Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
-
-        for (GridCacheMvccCandidate cand : dhtLocCands) {
-            if (cand.version().isLess(base)) {
-                if (keys.contains(cand.key()))
-                    lessPending.add(cand.version());
-            }
-            else
-                break;
-        }
-
-        return lessPending;
-    }
-
-    /**
-     *
+     * @param cacheCtx Cache context.
      * @param cand Cache lock candidate to add.
      * @return {@code True} if added as a result of this operation,
      *      {@code false} if was previously added.
@@ -924,24 +901,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         X.println(">>>   finishFutsSize: " + finishFuts.size());
     }
 
-
-    /**
-     * @param nodeId Node ID.
-     * @return Filter.
-     */
-    private IgnitePredicate<GridCacheMvccCandidate> nodeIdFilter(final UUID nodeId) {
-        if (nodeId == null)
-            return F.alwaysTrue();
-
-        return new P1<GridCacheMvccCandidate>() {
-            @Override public boolean apply(GridCacheMvccCandidate c) {
-                UUID otherId = c.otherNodeId();
-
-                return c.nodeId().equals(nodeId) || (otherId != null && otherId.equals(nodeId));
-            }
-        };
-    }
-
     /**
      * @param topVer Topology version.
      * @return Future that signals when all locks for given partitions are released.
@@ -994,6 +953,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      *
      * @return Finish update future.
      */
+    @SuppressWarnings("unchecked")
     public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) {
         GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4b8585e..0985ae3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -135,6 +135,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                    assert req.writeSynchronizationMode() != FULL_ASYNC : req;
+
                     // Always send reply in CLOCK ordering mode.
                     sendNearUpdateReply(res.nodeId(), res);
 
@@ -2247,6 +2249,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param req Request to remap.
      */
     private void remapToNewPrimary(GridNearAtomicUpdateRequest req) {
+        assert req.writeSynchronizationMode() == FULL_ASYNC : req;
+
         if (log.isDebugEnabled())
             log.debug("Remapping near update request locally: " + req);
 
@@ -2279,7 +2283,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             drRmvVals = null;
         }
         else {
-            assert req.operation() == DELETE;
+            assert req.operation() == DELETE : req;
 
             drRmvVals = req.conflictVersions();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4b1a58f..04128b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -168,13 +168,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return updateReq.topologyVersion();
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        if (waitForExchange && topologyVersion().compareTo(topVer) < 0)
+        if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
             return this;
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 07ec808..3f22808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
@@ -36,11 +35,9 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
-import org.jsr166.*;
 
 import javax.cache.expiry.*;
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
@@ -64,9 +61,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** Cache. */
     private GridDhtAtomicCache cache;
 
-    /** Future ID. */
-    private volatile GridCacheVersion futVer;
-
     /** Update operation. */
     private final GridCacheOperation op;
 
@@ -88,55 +82,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<GridCacheVersion> conflictRmvVals;
 
-    /** Mappings. */
-    @GridToStringInclude
-    private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
-
-    /** Error. */
-    private volatile CachePartialUpdateCheckedException err;
-
-    /** Operation result. */
-    private volatile GridCacheReturn opRes;
-
     /** Return value require flag. */
     private final boolean retval;
 
     /** Expiry policy. */
     private final ExpiryPolicy expiryPlc;
 
-    /** Future map topology version. */
-    private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
-    /** Completion future for a particular topology version. */
-    private GridFutureAdapter<Void> topCompleteFut;
-
     /** Optional filter. */
     private final CacheEntryPredicate[] filter;
 
     /** Write synchronization mode. */
     private final CacheWriteSynchronizationMode syncMode;
 
-    /** If this future mapped to single node. */
-    private volatile Boolean single;
-
-    /** If this future is mapped to a single node, this field will contain that node ID. */
-    private UUID singleNodeId;
-
-    /** Single update request. */
-    private GridNearAtomicUpdateRequest singleReq;
-
     /** Raw return value flag. */
     private final boolean rawRetval;
 
     /** Fast map flag. */
     private final boolean fastMap;
 
-    /** */
-    private boolean fastMapRemap;
-
-    /** */
-    private GridCacheVersion updVer;
-
     /** Near cache flag. */
     private final boolean nearEnabled;
 
@@ -156,7 +119,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     private final boolean waitTopFut;
 
     /** Remap count. */
-    private AtomicInteger remapCnt;
+    private int remapCnt;
+
+    /** State. */
+    private final UpdateState state;
 
     /**
      * @param cctx Cache context.
@@ -175,6 +141,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip store flag.
+     * @param remapCnt Maximum number of retries.
+     * @param waitTopFut If {@code false} does not wait for affinity change future.
      */
     public GridNearAtomicUpdateFuture(
         GridCacheContext cctx,
@@ -223,8 +191,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
 
-        mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
-
         fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
             cctx.config().getAtomicWriteOrderMode() == CLOCK &&
             !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
@@ -234,22 +200,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (!waitTopFut)
             remapCnt = 1;
 
-        this.remapCnt = new AtomicInteger(remapCnt);
+        this.remapCnt = remapCnt;
+
+        state = new UpdateState();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
-        return futVer.asGridUuid();
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public GridCacheVersion version() {
-        return futVer;
+        return state.futureVersion();
     }
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+        throw new UnsupportedOperationException();
     }
 
     /**
@@ -261,45 +229,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /** {@inheritDoc} */
     @Override public Collection<?> keys() {
         return keys;
     }
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        Boolean single0 = single;
-
-        if (single0 != null && single0) {
-            if (singleNodeId.equals(nodeId)) {
-                onDone(addFailedKeys(
-                    singleReq.keys(),
-                    singleReq.topologyVersion(),
-                    new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)));
-
-                return true;
-            }
-
-            return false;
-        }
-
-        GridNearAtomicUpdateRequest req = mappings.get(nodeId);
-
-        if (req != null) {
-            addFailedKeys(req.keys(),
-                req.topologyVersion(),
-                new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId));
-
-            mappings.remove(nodeId);
-
-            checkComplete();
-
-            return true;
-        }
+        state.onNodeLeft(nodeId);
 
         return false;
     }
@@ -329,142 +265,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
         if (topVer == null)
-            mapOnTopology(null, false, null);
+            mapOnTopology();
         else {
             topLocked = true;
 
             // Cannot remap.
-            remapCnt.set(1);
+            remapCnt = 1;
 
-            map0(topVer, null, false, null);
+            state.map(topVer);
         }
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        if (waitForPartitionExchange() && topologyVersion().compareTo(topVer) < 0) {
-            GridFutureAdapter<Void> fut = null;
-
-            synchronized (this) {
-                if (this.topVer == AffinityTopologyVersion.ZERO)
-                    return null;
+        if (waitForPartitionExchange()) {
+            GridFutureAdapter<Void> fut = state.completeFuture(topVer);
 
-                if (this.topVer.compareTo(topVer) < 0) {
-                    if (topCompleteFut == null)
-                        topCompleteFut = new GridFutureAdapter<>();
+            if (fut != null && isDone()) {
+                fut.onDone();
 
-                    fut = topCompleteFut;
-                }
+                return null;
             }
 
-            if (fut != null && isDone())
-                fut.onDone();
-
             return fut;
         }
 
         return null;
     }
 
-    /**
-     * @param failed Keys to remap.
-     * @param errTopVer Topology version for failed update.
-     */
-    private void remap(Collection<?> failed, AffinityTopologyVersion errTopVer) {
-        assert errTopVer != null;
-
-        GridCacheVersion futVer0 = futVer;
-
-        if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null)
-            return;
-
-        Collection<Object> remapKeys = new ArrayList<>(failed.size());
-        Collection<Object> remapVals = vals != null ? new ArrayList<>(failed.size()) : null;
-        Collection<GridCacheDrInfo> remapConflictPutVals = conflictPutVals != null ? new ArrayList<GridCacheDrInfo>(failed.size()) : null;
-        Collection<GridCacheVersion> remapConflictRmvVals = conflictRmvVals != null ? new ArrayList<GridCacheVersion>(failed.size()) : null;
-
-        Iterator<?> keyIt = keys.iterator();
-        Iterator<?> valsIt = vals != null ? vals.iterator() : null;
-        Iterator<GridCacheDrInfo> conflictPutValsIt = conflictPutVals != null ? conflictPutVals.iterator() : null;
-        Iterator<GridCacheVersion> conflictRmvValsIt = conflictRmvVals != null ? conflictRmvVals.iterator() : null;
-
-        for (Object key : failed) {
-            while (keyIt.hasNext()) {
-                Object nextKey = keyIt.next();
-                Object nextVal = valsIt != null ? valsIt.next() : null;
-                GridCacheDrInfo nextConflictPutVal = conflictPutValsIt != null ? conflictPutValsIt.next() : null;
-                GridCacheVersion nextConflictRmvVal = conflictRmvValsIt != null ? conflictRmvValsIt.next() : null;
-
-                if (F.eq(key, nextKey)) {
-                    remapKeys.add(nextKey);
-
-                    if (remapVals != null)
-                        remapVals.add(nextVal);
-
-                    if (remapConflictPutVals != null)
-                        remapConflictPutVals.add(nextConflictPutVal);
-
-                    if (remapConflictRmvVals != null)
-                        remapConflictRmvVals.add(nextConflictRmvVal);
-
-                    break;
-                }
-            }
-        }
-
-        keys = remapKeys;
-        vals = remapVals;
-        conflictPutVals = remapConflictPutVals;
-        conflictRmvVals = remapConflictRmvVals;
-
-        single = null;
-        futVer = null;
-        err = null;
-        opRes = null;
-
-        GridFutureAdapter<Void> fut0;
-
-        synchronized (this) {
-            mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
-
-            assert topVer != null && topVer.topologyVersion() > 0 : this;
-
-            topVer = AffinityTopologyVersion.ZERO;
-
-            fut0 = topCompleteFut;
-
-            topCompleteFut = null;
-        }
-
-        if (fut0 != null)
-            fut0.onDone();
-
-        singleNodeId = null;
-        singleReq = null;
-        fastMapRemap = false;
-        updVer = null;
-        topLocked = false;
-
-        IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1);
-
-        fut.listen(new CI1<IgniteInternalFuture<?>>() {
-            @Override public void apply(final IgniteInternalFuture<?> fut) {
-                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                    @Override public void run() {
-                        try {
-                            fut.get();
-
-                            map();
-                        }
-                        catch (IgniteCheckedException e) {
-                            onDone(e);
-                        }
-                    }
-                });
-            }
-        });
-    }
-
     /** {@inheritDoc} */
     @SuppressWarnings("ConstantConditions")
     @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
@@ -478,35 +306,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (op == TRANSFORM && retval == null)
             retval = Collections.emptyMap();
 
-        if (err != null && X.hasCause(err, CachePartialUpdateCheckedException.class) &&
-            X.hasCause(err, ClusterTopologyCheckedException.class) &&
-            storeFuture() &&
-            remapCnt.decrementAndGet() > 0) {
-            ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
-
-            if (!(topErr instanceof  ClusterTopologyServerNotFoundException)) {
-                CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
-
-                assert cause != null && cause.topologyVersion() != null : err;
-
-                remap(cause.failedKeys(), cause.topologyVersion());
-
-                return false;
-            }
-        }
-
         if (super.onDone(retval, err)) {
-            if (futVer != null)
-                cctx.mvcc().removeAtomicFuture(version());
-
-            GridFutureAdapter<Void> fut0;
-
-            synchronized (this) {
-                fut0 = topCompleteFut;
-            }
+            GridCacheVersion futVer = state.onFutureDone();
 
-            if (fut0 != null)
-                fut0.onDone();
+            if (futVer != null)
+                cctx.mvcc().removeAtomicFuture(futVer);
 
             return true;
         }
@@ -521,68 +325,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param res Update response.
      */
     public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
-        if (res.remapKeys() != null) {
-            assert !fastMap || cctx.kernalContext().clientNode();
-
-            Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
-
-            mapOnTopology(remapKeys, true, nodeId);
-
-            return;
-        }
-
-        GridCacheReturn ret = res.returnValue();
-
-        Boolean single0 = single;
-
-        if (single0 != null && single0) {
-            assert singleNodeId.equals(nodeId) : "Invalid response received for single-node mapped future " +
-                "[singleNodeId=" + singleNodeId + ", nodeId=" + nodeId + ", res=" + res + ']';
-
-            updateNear(singleReq, res);
-
-            if (res.error() != null) {
-                onDone(res.failedKeys() != null ?
-                    addFailedKeys(res.failedKeys(), singleReq.topologyVersion(), res.error()) : res.error());
-            }
-            else {
-                if (op == TRANSFORM) {
-                    if (ret != null)
-                        addInvokeResults(ret);
-
-                    onDone(opRes);
-                }
-                else {
-                    GridCacheReturn opRes0 = opRes = ret;
-
-                    onDone(opRes0);
-                }
-            }
-        }
-        else {
-            GridNearAtomicUpdateRequest req = mappings.get(nodeId);
-
-            if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
-                updateNear(req, res);
-
-                if (res.error() != null)
-                    addFailedKeys(req.keys(), req.topologyVersion(), res.error());
-                else {
-                    if (op == TRANSFORM) {
-                        assert !req.fastMap();
-
-                        if (ret != null)
-                            addInvokeResults(ret);
-                    }
-                    else if (req.fastMap() && req.hasPrimary())
-                        opRes = ret;
-                }
-
-                mappings.remove(nodeId);
-            }
-
-            checkComplete();
-        }
+        state.onResult(nodeId, res, false);
     }
 
     /**
@@ -602,12 +345,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /**
      * Maps future on ready topology.
-     *
-     * @param keys Keys to map.
-     * @param remap Boolean flag indicating if this is partial future remap.
-     * @param oldNodeId Old node ID if remap.
      */
-    private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) {
+    private void mapOnTopology() {
         cache.topology().readLock();
 
         AffinityTopologyVersion topVer = null;
@@ -634,11 +373,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
             else {
                 if (waitTopFut) {
+                    assert !topLocked : this;
+
                     fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                                 @Override public void run() {
-                                    mapOnTopology(keys, remap, oldNodeId);
+                                    mapOnTopology();
                                 }
                             });
                         }
@@ -654,232 +395,543 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             cache.topology().readUnlock();
         }
 
-        map0(topVer, keys, remap, oldNodeId);
+        state.map(topVer);
     }
 
     /**
-     * Checks if future is ready to be completed.
+     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
      */
-    private void checkComplete() {
-        boolean remap = false;
+    private boolean storeFuture() {
+        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+    }
 
-        synchronized (this) {
-            if (topVer != AffinityTopologyVersion.ZERO &&
-                ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty())) {
-                CachePartialUpdateCheckedException err0 = err;
+    /**
+     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+     * node and send updates in parallel to all participating nodes.
+     *
+     * @param key Key to map.
+     * @param topVer Topology version to map.
+     * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+     * @return Collection of nodes to which key is mapped.
+     */
+    private Collection<ClusterNode> mapKey(
+        KeyCacheObject key,
+        AffinityTopologyVersion topVer,
+        boolean fastMap
+    ) {
+        GridCacheAffinityManager affMgr = cctx.affinity();
 
-                if (err0 != null)
-                    onDone(err0);
-                else {
-                    if (fastMapRemap) {
-                        assert cctx.kernalContext().clientNode();
+        // If we can send updates in parallel - do it.
+        return fastMap ?
+            cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
+    }
 
-                        remap = true;
+    /**
+     * Maps future to single node.
+     *
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+        if (cctx.localNodeId().equals(nodeId)) {
+            cache.updateAllAsyncInternal(nodeId, req,
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                        onResult(res.nodeId(), res);
                     }
-                    else
-                        onDone(opRes);
-                }
-            }
+                });
         }
+        else {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-        if (remap)
-            mapOnTopology(null, true, null);
-    }
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
-    /**
-     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
-     */
-    private boolean storeFuture() {
-        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+                if (syncMode == FULL_ASYNC)
+                    onDone(new GridCacheReturn(cctx, true, null, true));
+            }
+            catch (IgniteCheckedException e) {
+                state.onSendError(req, e);
+            }
+        }
     }
 
     /**
-     * @param topVer Topology version.
-     * @param remapKeys Keys to remap or {@code null} to map all keys.
-     * @param remap Flag indicating if this is partial remap for this future.
-     * @param oldNodeId Old node ID if was remap.
+     * Sends messages to remote nodes and updates local cache.
+     *
+     * @param mappings Mappings to send.
      */
-    private void map0(
-        AffinityTopologyVersion topVer,
-        @Nullable Collection<?> remapKeys,
-        boolean remap,
-        @Nullable UUID oldNodeId) {
-        assert oldNodeId == null || remap || fastMapRemap;
+    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+        UUID locNodeId = cctx.localNodeId();
 
-        Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+        GridNearAtomicUpdateRequest locUpdate = null;
 
-        if (F.isEmpty(topNodes)) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
-                "left the grid)."));
+        // Send messages to remote nodes first, then run local update.
+        for (GridNearAtomicUpdateRequest req : mappings.values()) {
+            if (locNodeId.equals(req.nodeId())) {
+                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+                    ", req=" + req + ']';
 
-            return;
+                locUpdate = req;
+            }
+            else {
+                try {
+                    if (log.isDebugEnabled())
+                        log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                }
+                catch (IgniteCheckedException e) {
+                    state.onSendError(req, e);
+                }
+            }
         }
 
-        if (futVer == null)
-            // Assign future version in topology read lock before first exception may be thrown.
-            futVer = cctx.versions().next(topVer);
+        if (locUpdate != null) {
+            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                        onResult(res.nodeId(), res);
+                    }
+                });
+        }
 
-        if (!remap && storeFuture())
-            cctx.mvcc().addAtomicFuture(version(), this);
+        if (syncMode == FULL_ASYNC)
+            onDone(new GridCacheReturn(cctx, true, null, true));
+    }
 
-        CacheConfiguration ccfg = cctx.config();
+    /**
+     *
+     */
+    private class UpdateState {
+        /** Current topology version. */
+        private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
-        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-        if (updVer == null)
-            updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+        /** */
+        private GridCacheVersion updVer;
 
-        if (updVer != null && log.isDebugEnabled())
-            log.debug("Assigned fast-map version for update on near node: " + updVer);
+        /** Topology version when got mapping error. */
+        private AffinityTopologyVersion mapErrTopVer;
 
-        if (keys.size() == 1 && !fastMap && (single == null || single)) {
-            assert remapKeys == null || remapKeys.size() == 1 : remapKeys;
+        /** Mappings if operations is mapped to more than one node. */
+        @GridToStringInclude
+        private Map<UUID, GridNearAtomicUpdateRequest> mappings;
 
-            Object key = F.first(keys);
+        /** Error. */
+        private CachePartialUpdateCheckedException err;
 
-            Object val;
-            GridCacheVersion conflictVer;
-            long conflictTtl;
-            long conflictExpireTime;
+        /** Future ID. */
+        private GridCacheVersion futVer;
 
-            if (vals != null) {
-                // Regular PUT.
-                val = F.first(vals);
-                conflictVer = null;
-                conflictTtl = CU.TTL_NOT_CHANGED;
-                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-            }
-            else if (conflictPutVals != null) {
-                // Conflict PUT.
-                GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+        /** Completion future for a particular topology version. */
+        private GridFutureAdapter<Void> topCompleteFut;
 
-                val = conflictPutVal.value();
-                conflictVer = conflictPutVal.version();
-                conflictTtl = conflictPutVal.ttl();
-                conflictExpireTime = conflictPutVal.expireTime();
-            }
-            else if (conflictRmvVals != null) {
-                // Conflict REMOVE.
-                val = null;
-                conflictVer = F.first(conflictRmvVals);
-                conflictTtl = CU.TTL_NOT_CHANGED;
-                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-            }
-            else {
-                // Regular REMOVE.
-                val = null;
-                conflictVer = null;
-                conflictTtl = CU.TTL_NOT_CHANGED;
-                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-            }
+        /** Keys to remap. */
+        private Collection<KeyCacheObject> remapKeys;
 
-            // We still can get here if user pass map with single element.
-            if (key == null) {
-                NullPointerException err = new NullPointerException("Null key.");
+        /** Not null is operation is mapped to single node. */
+        private GridNearAtomicUpdateRequest singleReq;
 
-                onDone(err);
+        /** Operation result. */
+        private GridCacheReturn opRes;
 
-                return;
-            }
+        /**
+         * @return Future version.
+         */
+        @Nullable synchronized GridCacheVersion futureVersion() {
+            return futVer;
+        }
 
-            if (val == null && op != GridCacheOperation.DELETE) {
-                NullPointerException err = new NullPointerException("Null value.");
+        /**
+         * @param nodeId Left node ID.
+         */
+        void onNodeLeft(UUID nodeId) {
+            GridNearAtomicUpdateResponse res = null;
 
-                onDone(err);
+            synchronized (this) {
+                GridNearAtomicUpdateRequest req;
+
+                if (singleReq != null)
+                    req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+                else
+                    req = mappings != null ? mappings.get(nodeId) : null;
+
+                if (req != null) {
+                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion());
+
+                    res.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before " +
+                        "response is received: " + nodeId));
+                }
+            }
+
+            if (res != null)
+                onResult(nodeId, res, true);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         * @param nodeErr {@code True} if response was created on node failure.
+         */
+        void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+            GridNearAtomicUpdateRequest req;
+
+            AffinityTopologyVersion remapTopVer = null;
+
+            GridCacheReturn opRes0 = null;
+            CachePartialUpdateCheckedException err0 = null;
+
+            boolean rcvAll;
+
+            GridFutureAdapter<?> fut0 = null;
+
+            synchronized (this) {
+                if (!res.futureVersion().equals(futVer))
+                    return;
+
+                if (singleReq != null) {
+                    if (!singleReq.nodeId().equals(nodeId))
+                        return;
+
+                    req = singleReq;
+
+                    singleReq = null;
+
+                    rcvAll = true;
+                }
+                else {
+                    req = mappings != null ? mappings.remove(nodeId) : null;
+
+                    if (req != null)
+                        rcvAll = mappings.isEmpty();
+                    else
+                        return;
+                }
+
+                assert req != null && req.topologyVersion().equals(topVer) : req;
+
+                if (res.remapKeys() != null) {
+                    assert !fastMap || cctx.kernalContext().clientNode();
+
+                    if (remapKeys == null)
+                        remapKeys = U.newHashSet(res.remapKeys().size());
+
+                    remapKeys.addAll(res.remapKeys());
+
+                    if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+                        mapErrTopVer = req.topologyVersion();
+                }
+                else if (res.error() != null) {
+                    if (res.failedKeys() != null)
+                        addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+                }
+                else {
+                    if (!req.fastMap() || req.hasPrimary()) {
+                        GridCacheReturn ret = res.returnValue();
+
+                        if (op == TRANSFORM) {
+                            if (ret != null)
+                                addInvokeResults(ret);
+                        }
+                        else
+                            opRes = ret;
+                    }
+                }
+
+                if (rcvAll) {
+                    if (remapKeys != null) {
+                        assert mapErrTopVer != null;
+
+                        remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+                    }
+                    else {
+                        if (err != null &&
+                            X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                            X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                            storeFuture() &&
+                            --remapCnt > 0) {
+                            ClusterTopologyCheckedException topErr =
+                                X.cause(err, ClusterTopologyCheckedException.class);
+
+                            if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                                CachePartialUpdateCheckedException cause =
+                                    X.cause(err, CachePartialUpdateCheckedException.class);
+
+                                assert cause != null && cause.topologyVersion() != null : err;
+
+                                remapTopVer =
+                                    new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+                                err = null;
+
+                                Collection<Object> failedKeys = cause.failedKeys();
+
+                                remapKeys = new ArrayList<>(failedKeys.size());
+
+                                for (Object key : failedKeys)
+                                    remapKeys.add(cctx.toCacheKeyObject(key));
+
+                                updVer = null;
+                            }
+                        }
+                    }
+
+                    if (remapTopVer == null) {
+                        err0 = err;
+                        opRes0 = opRes;
+                    }
+                    else {
+                        fut0 = topCompleteFut;
+
+                        topCompleteFut = null;
+
+                        cctx.mvcc().removeAtomicFuture(futVer);
+
+                        futVer = null;
+                        topVer = AffinityTopologyVersion.ZERO;
+                    }
+                }
+            }
+
+            if (res.error() != null && res.failedKeys() == null) {
+                onDone(res.error());
 
                 return;
             }
 
-            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+            if (!nodeErr && res.remapKeys() == null)
+                updateNear(req, res);
 
-            if (op != TRANSFORM)
-                val = cctx.toCacheObject(val);
+            if (remapTopVer != null) {
+                if (fut0 != null)
+                    fut0.onDone();
 
-            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+                if (!waitTopFut) {
+                    onDone(new GridCacheTryPutFailedException());
+
+                    return;
+                }
+
+                if (topLocked) {
+                    assert !F.isEmpty(remapKeys) : remapKeys;
+
+                    CachePartialUpdateCheckedException e =
+                        new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+                    ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+                        "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+                    cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+                    e.add(remapKeys, cause);
+
+                    onDone(e);
+
+                    return;
+                }
+
+                IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer);
+
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    AffinityTopologyVersion topVer = fut.get();
+
+                                    map(topVer);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    onDone(e);
+                                }
+                            }
+                        });
+                    }
+                });
 
-            if (primary == null) {
+                return;
+            }
+
+            if (rcvAll)
+                onDone(opRes0, err0);
+        }
+
+        /**
+         * @param req Request.
+         * @param e Error.
+         */
+        void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+            synchronized (this) {
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                    req.nodeId(),
+                    req.futureVersion());
+
+                res.addFailedKeys(req.keys(), e);
+
+                onResult(req.nodeId(), res, true);
+            }
+        }
+
+        /**
+         * @param topVer Topology version.
+         */
+        void map(AffinityTopologyVersion topVer) {
+            Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+            if (F.isEmpty(topNodes)) {
                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                     "left the grid)."));
 
                 return;
             }
 
-            GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
-                cctx.cacheId(),
-                primary.id(),
-                futVer,
-                fastMap,
-                updVer,
-                topVer,
-                topLocked,
-                syncMode,
-                op,
-                retval,
-                expiryPlc,
-                invokeArgs,
-                filter,
-                subjId,
-                taskNameHash,
-                skipStore,
-                cctx.kernalContext().clientNode());
+            Exception err = null;
+            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
 
-            req.addUpdateEntry(cacheKey,
-                val,
-                conflictTtl,
-                conflictExpireTime,
-                conflictVer,
-                true);
+            int size = keys.size();
 
             synchronized (this) {
+                assert futVer == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
                 this.topVer = topVer;
 
-                single = true;
+                futVer = cctx.versions().next(topVer);
+
+                if (storeFuture())
+                    cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
+
+                // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+                if (updVer == null)
+                    updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+
+                if (updVer != null && log.isDebugEnabled())
+                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+
+                try {
+                    if (size == 1 && !fastMap) {
+                        assert remapKeys == null || remapKeys.size() == 1;
+
+                        singleReq = mapSingleUpdate();
+                    }
+                    else {
+                        pendingMappings = mapUpdate(topNodes);
+
+                        if (pendingMappings.size() == 1)
+                            singleReq = F.firstValue(pendingMappings);
+                        else {
+                            if (syncMode == PRIMARY_SYNC) {
+                                mappings = U.newHashMap(pendingMappings.size());
+
+                                for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+                                    if (req.hasPrimary())
+                                        mappings.put(req.nodeId(), req);
+                                }
+                            }
+                            else
+                                mappings = new HashMap<>(pendingMappings);
+
+                            assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
+                        }
+                    }
+
+                    remapKeys = null;
+                }
+                catch (Exception e) {
+                    err = e;
+                }
+            }
+
+            if (err != null) {
+                onDone(err);
+
+                return;
             }
 
             // Optimize mapping for single key.
-            mapSingle(primary.id(), req);
+            if (singleReq != null)
+                mapSingle(singleReq.nodeId(), singleReq);
+            else {
+                assert pendingMappings != null;
 
-            return;
+                if (size == 0)
+                    onDone(new GridCacheReturn(cctx, true, null, true));
+                else
+                    doUpdate(pendingMappings);
+            }
         }
 
-        Iterator<?> it = null;
+        /**
+         * @param topVer Topology version.
+         * @return Future.
+         */
+        @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+            if (this.topVer == AffinityTopologyVersion.ZERO)
+                return null;
 
-        if (vals != null)
-            it = vals.iterator();
+            if (this.topVer.compareTo(topVer) < 0) {
+                if (topCompleteFut == null)
+                    topCompleteFut = new GridFutureAdapter<>();
 
-        Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+                return topCompleteFut;
+            }
 
-        if (conflictPutVals != null)
-            conflictPutValsIt = conflictPutVals.iterator();
+            return null;
+        }
 
-        Iterator<GridCacheVersion> conflictRmvValsIt = null;
+        /**
+         * @return Future version.
+         */
+        GridCacheVersion onFutureDone() {
+            GridCacheVersion ver0;
 
-        if (conflictRmvVals != null)
-            conflictRmvValsIt = conflictRmvVals.iterator();
+            GridFutureAdapter<Void> fut0;
 
-        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+            synchronized (this) {
+                fut0 = topCompleteFut;
 
-        // Must do this in synchronized block because we need to atomically remove and add mapping.
-        // Otherwise checkComplete() may see empty intermediate state.
-        synchronized (this) {
-            if (oldNodeId != null)
-                removeMapping(oldNodeId);
+                topCompleteFut = null;
 
-            // For fastMap mode wait for all responses before remapping.
-            if (remap && fastMap && !mappings.isEmpty()) {
-                fastMapRemap = true;
+                ver0 = futVer;
 
-                return;
+                futVer = null;
             }
 
-            // Create mappings first, then send messages.
-            for (Object key : keys) {
-                if (key == null) {
-                    NullPointerException err = new NullPointerException("Null key.");
+            if (fut0 != null)
+                fut0.onDone();
 
-                    onDone(err);
+            return ver0;
+        }
 
-                    return;
-                }
+        /**
+         * @param topNodes Cache nodes.
+         * @return Mapping.
+         * @throws Exception If failed.
+         */
+        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception {
+            Iterator<?> it = null;
+
+            if (vals != null)
+                it = vals.iterator();
+
+            Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+
+            if (conflictPutVals != null)
+                conflictPutValsIt = conflictPutVals.iterator();
+
+            Iterator<GridCacheVersion> conflictRmvValsIt = null;
+
+            if (conflictRmvVals != null)
+                conflictRmvValsIt = conflictRmvVals.iterator();
+
+            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+
+            // Create mappings first, then send messages.
+            for (Object key : keys) {
+                if (key == null)
+                    throw new NullPointerException("Null key.");
 
                 Object val;
                 GridCacheVersion conflictVer;
@@ -892,13 +944,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     conflictTtl = CU.TTL_NOT_CHANGED;
                     conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
 
-                    if (val == null) {
-                        NullPointerException err = new NullPointerException("Null value.");
-
-                        onDone(err);
-
-                        return;
-                    }
+                    if (val == null)
+                        throw new NullPointerException("Null value.");
                 }
                 else if (conflictPutVals != null) {
                     GridCacheDrInfo conflictPutVal =  conflictPutValsIt.next();
@@ -934,22 +981,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                 Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
 
-                if (affNodes.isEmpty()) {
-                    onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                        "(all partition nodes left the grid)."));
-
-                    return;
-                }
+                if (affNodes.isEmpty())
+                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                        "(all partition nodes left the grid).");
 
                 int i = 0;
 
                 for (ClusterNode affNode : affNodes) {
-                    if (affNode == null) {
-                        onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                            "(all partition nodes left the grid)."));
-
-                        return;
-                    }
+                    if (affNode == null)
+                        throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                            "(all partition nodes left the grid).");
 
                     UUID nodeId = affNode.id();
 
@@ -976,11 +1017,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             cctx.kernalContext().clientNode());
 
                         pendingMappings.put(nodeId, mapped);
-
-                        GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
-
-                        assert old == null || (old != null && remap) :
-                            "Invalid mapping state [old=" + old + ", remap=" + remap + ']';
                     }
 
                     mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
@@ -989,187 +1025,140 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 }
             }
 
-            this.topVer = topVer;
-
-            fastMapRemap = false;
-        }
-
-        if ((single == null || single) && pendingMappings.size() == 1) {
-            Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
-
-            single = true;
-
-            mapSingle(entry.getKey(), entry.getValue());
-
-            return;
+            return pendingMappings;
         }
-        else
-            single = false;
-
-        doUpdate(pendingMappings);
-    }
-
-    /**
-     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
-     * node and send updates in parallel to all participating nodes.
-     *
-     * @param key Key to map.
-     * @param topVer Topology version to map.
-     * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
-     * @return Collection of nodes to which key is mapped.
-     */
-    private Collection<ClusterNode> mapKey(
-        KeyCacheObject key,
-        AffinityTopologyVersion topVer,
-        boolean fastMap
-    ) {
-        GridCacheAffinityManager affMgr = cctx.affinity();
 
-        // If we can send updates in parallel - do it.
-        return fastMap ?
-            cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primary(key, topVer));
-    }
-
-    /**
-     * Maps future to single node.
-     *
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
-        singleNodeId = nodeId;
-        singleReq = req;
-
-        if (cctx.localNodeId().equals(nodeId)) {
-            cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req,
-                        GridNearAtomicUpdateResponse res) {
-                        assert res.futureVersion().equals(futVer) : futVer;
+        /**
+         * @return Request.
+         * @throws Exception If failed.
+         */
+        private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
+            Object key = F.first(keys);
 
-                        onResult(res.nodeId(), res);
-                    }
-                });
-        }
-        else {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+            Object val;
+            GridCacheVersion conflictVer;
+            long conflictTtl;
+            long conflictExpireTime;
 
-                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+            if (vals != null) {
+                // Regular PUT.
+                val = F.first(vals);
+                conflictVer = null;
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+            }
+            else if (conflictPutVals != null) {
+                // Conflict PUT.
+                GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
 
-                if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
-                    onDone(new GridCacheReturn(cctx, true, null, true));
+                val = conflictPutVal.value();
+                conflictVer = conflictPutVal.version();
+                conflictTtl = conflictPutVal.ttl();
+                conflictExpireTime = conflictPutVal.expireTime();
             }
-            catch (IgniteCheckedException e) {
-                onDone(addFailedKeys(req.keys(), req.topologyVersion(), e));
+            else if (conflictRmvVals != null) {
+                // Conflict REMOVE.
+                val = null;
+                conflictVer = F.first(conflictRmvVals);
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+            }
+            else {
+                // Regular REMOVE.
+                val = null;
+                conflictVer = null;
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
             }
-        }
-    }
-
-    /**
-     * Sends messages to remote nodes and updates local cache.
-     *
-     * @param mappings Mappings to send.
-     */
-    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
-        UUID locNodeId = cctx.localNodeId();
 
-        GridNearAtomicUpdateRequest locUpdate = null;
+            // We still can get here if user pass map with single element.
+            if (key == null)
+                throw new NullPointerException("Null key.");
 
-        // Send messages to remote nodes first, then run local update.
-        for (GridNearAtomicUpdateRequest req : mappings.values()) {
-            if (locNodeId.equals(req.nodeId())) {
-                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
-                    ", req=" + req + ']';
+            if (val == null && op != GridCacheOperation.DELETE)
+                throw new NullPointerException("Null value.");
 
-                locUpdate = req;
-            }
-            else {
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-                }
-                catch (IgniteCheckedException e) {
-                    addFailedKeys(req.keys(), req.topologyVersion(), e);
+            if (op != TRANSFORM)
+                val = cctx.toCacheObject(val);
 
-                    removeMapping(req.nodeId());
-                }
+            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
 
-                if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
-                    removeMapping(req.nodeId());
-            }
-        }
+            if (primary == null)
+                throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                    "left the grid).");
 
-        if (syncMode == FULL_ASYNC)
-            // In FULL_ASYNC mode always return (null, true).
-            opRes = new GridCacheReturn(cctx, true, null, true);
+            GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+                cctx.cacheId(),
+                primary.id(),
+                futVer,
+                fastMap,
+                updVer,
+                topVer,
+                topLocked,
+                syncMode,
+                op,
+                retval,
+                expiryPlc,
+                invokeArgs,
+                filter,
+                subjId,
+                taskNameHash,
+                skipStore,
+                cctx.kernalContext().clientNode());
 
-        if (locUpdate != null) {
-            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req,
-                        GridNearAtomicUpdateResponse res) {
-                        assert res.futureVersion().equals(futVer) : futVer;
+            req.addUpdateEntry(cacheKey,
+                val,
+                conflictTtl,
+                conflictExpireTime,
+                conflictVer,
+                true);
 
-                        onResult(res.nodeId(), res);
-                    }
-                });
+            return req;
         }
 
-        checkComplete();
-    }
-
-    /**
-     * Removes mapping from future mappings map.
-     *
-     * @param nodeId Node ID to remove mapping for.
-     */
-    private void removeMapping(UUID nodeId) {
-        mappings.remove(nodeId);
-    }
-
-    /**
-     * @param ret Result from single node.
-     */
-    @SuppressWarnings("unchecked")
-    private synchronized void addInvokeResults(GridCacheReturn ret) {
-        assert op == TRANSFORM : op;
-        assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-        if (ret.value() != null) {
-            if (opRes != null)
-                opRes.mergeEntryProcessResults(ret);
-            else
-                opRes = ret;
+        /**
+         * @param ret Result from single node.
+         */
+        @SuppressWarnings("unchecked")
+        private void addInvokeResults(GridCacheReturn ret) {
+            assert op == TRANSFORM : op;
+            assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+            if (ret.value() != null) {
+                if (opRes != null)
+                    opRes.mergeEntryProcessResults(ret);
+                else
+                    opRes = ret;
+            }
         }
-    }
 
-    /**
-     * @param failedKeys Failed keys.
-     * @param topVer Topology version for failed update.
-     * @param err Error cause.
-     * @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}.
-     */
-    private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys,
-        AffinityTopologyVersion topVer,
-        Throwable err) {
-        CachePartialUpdateCheckedException err0 = this.err;
+        /**
+         * @param failedKeys Failed keys.
+         * @param topVer Topology version for failed update.
+         * @param err Error cause.
+         */
+        private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+            AffinityTopologyVersion topVer,
+            Throwable err) {
+            CachePartialUpdateCheckedException err0 = this.err;
 
-        if (err0 == null)
-            err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+            if (err0 == null)
+                err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
 
-        Collection<Object> keys = new ArrayList<>(failedKeys.size());
+            Collection<Object> keys = new ArrayList<>(failedKeys.size());
 
-        for (KeyCacheObject key : failedKeys)
-            keys.add(key.value(cctx.cacheObjectContext(), false));
+            for (KeyCacheObject key : failedKeys)
+                keys.add(key.value(cctx.cacheObjectContext(), false));
 
-        err0.add(keys, err, topVer);
+            err0.add(keys, err, topVer);
+        }
 
-        return err0;
+        /** {@inheritDoc} */
+        @Override public synchronized  String toString() {
+            return S.toString(UpdateState.class, this);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 2064338..d56ed7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -128,7 +128,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
     /**
      * @param ignoreChildFailures Flag indicating whether compound future should ignore child futures failures.
      */
-    public void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures) {
+    @SafeVarargs
+    public final void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures) {
         this.ignoreChildFailures = ignoreChildFailures;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
index d5c5314..fc9dad0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
@@ -421,6 +421,7 @@ public final class X {
      * @return {@code True} if one of the causing exception is an instance of passed in classes,
      *      {@code false} otherwise.
      */
+    @SafeVarargs
     public static boolean hasCause(@Nullable Throwable t, @Nullable Class<? extends Throwable>... cls) {
         if (t == null || F.isEmpty(cls))
             return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java
index f681e59..97590b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerEagerTtlDisabledTest.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.processors.cache;
 
 
+import org.apache.ignite.configuration.*;
+
 /**
- * Tests expire events when {@link GridCacheConfiguration#isEagerTtl()} is disabled.
+ * Tests expire events when {@link CacheConfiguration#isEagerTtl()} is disabled.
  */
 public class IgniteCacheEntryListenerEagerTtlDisabledTest extends IgniteCacheEntryListenerTxTest {
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 5d0cacc..caf4699 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -27,12 +27,14 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.testframework.*;
 
+import javax.cache.processor.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
 
 /**
  *
@@ -46,7 +48,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
     /**
      * @return Keys count for the test.
      */
-    protected abstract int keysCount();
+    private int keysCount() {
+        return 10_000;
+    }
 
     /** {@inheritDoc} */
     @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -54,7 +58,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
 
         cfg.setAtomicWriteOrderMode(writeOrderMode());
         cfg.setBackups(1);
-        cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cfg.setRebalanceMode(SYNC);
 
         return cfg;
     }
@@ -78,25 +82,47 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
     protected CacheAtomicWriteOrderMode writeOrderMode() {
         return CLOCK;
     }
+
     /**
      * @throws Exception If failed.
      */
     public void testPut() throws Exception {
-        checkPut(false);
+        checkRetry(Test.PUT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAll() throws Exception {
+        checkRetry(Test.PUT_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAsync() throws Exception {
-        checkPut(true);
+        checkRetry(Test.PUT_ASYNC);
     }
 
     /**
-     * @param async If {@code true} tests asynchronous put.
      * @throws Exception If failed.
      */
-    private void checkPut(boolean async) throws Exception {
+    public void testInvoke() throws Exception {
+        checkRetry(Test.INVOKE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeAll() throws Exception {
+        checkRetry(Test.INVOKE_ALL);
+    }
+
+    /**
+     * @param test Test type.
+     * @throws Exception If failed.
+     */
+    private void checkRetry(Test test) throws Exception {
         final AtomicBoolean finished = new AtomicBoolean();
 
         int keysCnt = keysCount();
@@ -115,52 +141,151 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
             }
         });
 
+        IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
 
-        IgniteCache<Object, Object> cache = ignite(0).cache(null);
+        int iter = 0;
 
-        if (atomicityMode() == ATOMIC)
-            assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
+        try {
+            if (atomicityMode() == ATOMIC)
+                assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
 
-        int iter = 0;
+            long stopTime = System.currentTimeMillis() + 60_000;
 
-        long stopTime = System.currentTimeMillis() + 60_000;
+            switch (test) {
+                case PUT: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
 
-        if (async) {
-            IgniteCache<Object, Object> cache0 = cache.withAsync();
+                        for (int i = 0; i < keysCnt; i++)
+                            cache.put(i, val);
 
-            while (System.currentTimeMillis() < stopTime) {
-                Integer val = ++iter;
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
 
-                for (int i = 0; i < keysCnt; i++) {
-                    cache0.put(i, val);
+                    break;
+                }
+
+                case PUT_ALL: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
+
+                        Map<Integer, Integer> map = new LinkedHashMap<>();
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            map.put(i, val);
+
+                            if (map.size() == 100 || i == keysCnt - 1) {
+                                cache.putAll(map);
+
+                                map.clear();
+                            }
+                        }
 
-                    cache0.future().get();
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
                 }
 
-                for (int i = 0; i < keysCnt; i++) {
-                    cache0.get(i);
+                case PUT_ASYNC: {
+                    IgniteCache<Integer, Integer> cache0 = cache.withAsync();
+
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            cache0.put(i, val);
+
+                            cache0.future().get();
+                        }
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            cache0.get(i);
+
+                            assertEquals(val, cache0.future().get());
+                        }
+                    }
 
-                    assertEquals(val, cache0.future().get());
+                    break;
                 }
-            }
-        }
-        else {
-            while (System.currentTimeMillis() < stopTime) {
-                Integer val = ++iter;
 
-                for (int i = 0; i < keysCnt; i++)
-                    cache.put(i, val);
+                case INVOKE: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
+
+                        Integer expOld = iter - 1;
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            Integer old = cache.invoke(i, new SetEntryProcessor(val));
+
+                            assertNotNull(old);
+                            assertTrue(old.equals(expOld) || old.equals(val));
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
+
+                    break;
+                }
 
-                for (int i = 0; i < keysCnt; i++)
-                    assertEquals(val, cache.get(i));
+                case INVOKE_ALL: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
+
+                        Integer expOld = iter - 1;
+
+                        Set<Integer> keys = new LinkedHashSet<>();
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            keys.add(i);
+
+                            if (keys.size() == 100 || i == keysCnt - 1) {
+                                Map<Integer, EntryProcessorResult<Integer>> resMap =
+                                    cache.invokeAll(keys, new SetEntryProcessor(val));
+
+                                for (Integer key : keys) {
+                                    EntryProcessorResult<Integer> res = resMap.get(key);
+
+                                    assertNotNull(res);
+
+                                    Integer old = res.get();
+
+                                    assertTrue(old.equals(expOld) || old.equals(val));
+                                }
+
+                                assertEquals(keys.size(), resMap.size());
+
+                                keys.clear();
+                            }
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
+
+                    break;
+                }
+
+                default:
+                    assert false : test;
             }
         }
-
-        finished.set(true);
-        fut.get();
+        finally {
+            finished.set(true);
+            fut.get();
+        }
 
         for (int i = 0; i < keysCnt; i++)
-            assertEquals(iter, cache.get(i));
+            assertEquals((Integer)iter, cache.get(i));
+
+        for (int i = 0; i < gridCount(); i++) {
+            IgniteKernal ignite = (IgniteKernal)grid(i);
+
+            Collection<?> futs = ignite.context().cache().context().mvcc().atomicFutures();
+
+            assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty());
+        }
     }
 
     /**
@@ -201,34 +326,41 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
         try {
             int keysCnt = keysCount();
 
-        boolean eThrown = false;
+            boolean eThrown = false;
 
-        IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries();
+            IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries();
 
-        if (async)
-            cache = cache.withAsync();
+            if (async)
+                cache = cache.withAsync();
 
-        for (int i = 0; i < keysCnt; i++) {
-            try {
-                if (async) {
-                    cache.put(i, i);
+            long stopTime = System.currentTimeMillis() + 60_000;
 
-                    cache.future().get();
+            while (System.currentTimeMillis() < stopTime) {
+                for (int i = 0; i < keysCnt; i++) {
+                    try {
+                        if (async) {
+                            cache.put(i, i);
+
+                            cache.future().get();
+                        }
+                        else
+                            cache.put(i, i);
+                    }
+                    catch (Exception e) {
+                        assertTrue("Invalid exception: " + e,
+                            X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
+
+                        eThrown = true;
+
+                        break;
+                    }
                 }
-                else
-                    cache.put(i, i);
-            }
-            catch (Exception e) {
-                assertTrue("Invalid exception: " + e,
-                    X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
-
-                eThrown = true;
 
+                if (eThrown)
                     break;
-                }
             }
 
-        assertTrue(eThrown);
+            assertTrue(eThrown);
 
             finished.set(true);
 
@@ -243,4 +375,48 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
     @Override protected long getTestTimeout() {
         return 3 * 60 * 1000;
     }
+
+    /**
+     *
+     */
+    enum Test {
+        /** */
+        PUT,
+
+        /** */
+        PUT_ALL,
+
+        /** */
+        PUT_ASYNC,
+
+        /** */
+        INVOKE,
+
+        /** */
+        INVOKE_ALL
+    }
+
+    /**
+     *
+     */
+    class SetEntryProcessor implements CacheEntryProcessor<Integer, Integer, Integer> {
+        /** */
+        private Integer val;
+
+        /**
+         * @param val Value.
+         */
+        public SetEntryProcessor(Integer val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> e, Object... args) {
+            Integer old = e.getValue();
+
+            e.setValue(val);
+
+            return old == null ? 0 : old;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b625a3a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
index e76663a..be442d2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
@@ -16,7 +16,21 @@
  */
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
 
 /**
  *
@@ -24,11 +38,66 @@ import org.apache.ignite.cache.*;
 public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest {
     /** {@inheritDoc} */
     @Override protected CacheAtomicityMode atomicityMode() {
-        return CacheAtomicityMode.ATOMIC;
+        return ATOMIC;
     }
 
-    /** {@inheritDoc} */
-    @Override protected int keysCount() {
-        return 60_000;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutInsideTransaction() throws Exception {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("tx-cache");
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        try (IgniteCache<Integer, Integer> txCache = ignite(0).getOrCreateCache(ccfg)) {
+            final AtomicBoolean finished = new AtomicBoolean();
+
+            IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    while (!finished.get()) {
+                        stopGrid(3);
+
+                        U.sleep(300);
+
+                        startGrid(3);
+                    }
+
+                    return null;
+                }
+            });
+
+            try {
+                IgniteTransactions txs = ignite(0).transactions();
+
+                IgniteCache<Object, Object> cache = ignite(0).cache(null);
+
+                long stopTime = System.currentTimeMillis() + 60_000;
+
+                while (System.currentTimeMillis() < stopTime) {
+                    for (int i = 0; i < 10_000; i++) {
+                        try {
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                txCache.put(0, 0);
+
+                                cache.put(i, i);
+
+                                tx.commit();
+                            }
+                        }
+                        catch (IgniteException | CacheException e) {
+                            log.info("Ignore exception: " + e);
+                        }
+                    }
+                }
+
+                finished.set(true);
+
+                fut.get();
+            }
+            finally {
+                finished.set(true);
+            }
+        }
     }
 }


Mime
View raw message