ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: # ignite-1124 WIP
Date Tue, 25 Aug 2015 14:35:18 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1124 [created] e3108238e


# ignite-1124 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3108238
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3108238
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3108238

Branch: refs/heads/ignite-1124
Commit: e3108238e0f99078ece5a026780a25562af63795
Parents: 7b61a09
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Aug 25 17:35:05 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Aug 25 17:35:05 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |    5 -
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |    7 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 1385 +++++++++---------
 3 files changed, 660 insertions(+), 737 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e3108238/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8724d3a..e64a9e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,11 +27,6 @@ import java.util.*;
  */
 public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
-     * @return Future topology version.
-     */
-    public AffinityTopologyVersion topologyVersion();
-
-    /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.
      *
      * @param topVer Topology version to finish.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3108238/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4b1a58f..04128b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -168,13 +168,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return updateReq.topologyVersion();
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        if (waitForExchange && topologyVersion().compareTo(topVer) < 0)
+        if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
             return this;
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3108238/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 07ec808..0014f86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
@@ -64,9 +63,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** Cache. */
     private GridDhtAtomicCache cache;
 
-    /** Future ID. */
-    private volatile GridCacheVersion futVer;
-
     /** Update operation. */
     private final GridCacheOperation op;
 
@@ -88,13 +84,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<GridCacheVersion> conflictRmvVals;
 
-    /** Mappings. */
-    @GridToStringInclude
-    private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
-
-    /** Error. */
-    private volatile CachePartialUpdateCheckedException err;
-
     /** Operation result. */
     private volatile GridCacheReturn opRes;
 
@@ -104,39 +93,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** Expiry policy. */
     private final ExpiryPolicy expiryPlc;
 
-    /** Future map topology version. */
-    private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
-    /** Completion future for a particular topology version. */
-    private GridFutureAdapter<Void> topCompleteFut;
-
     /** Optional filter. */
     private final CacheEntryPredicate[] filter;
 
     /** Write synchronization mode. */
     private final CacheWriteSynchronizationMode syncMode;
 
-    /** If this future mapped to single node. */
-    private volatile Boolean single;
-
-    /** If this future is mapped to a single node, this field will contain that node ID. */
-    private UUID singleNodeId;
-
-    /** Single update request. */
-    private GridNearAtomicUpdateRequest singleReq;
-
     /** Raw return value flag. */
     private final boolean rawRetval;
 
     /** Fast map flag. */
     private final boolean fastMap;
 
-    /** */
-    private boolean fastMapRemap;
-
-    /** */
-    private GridCacheVersion updVer;
-
     /** Near cache flag. */
     private final boolean nearEnabled;
 
@@ -158,582 +126,401 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** Remap count. */
     private AtomicInteger remapCnt;
 
+    /** State. */
+    private final UpdateState state;
+
     /**
-     * @param cctx Cache context.
-     * @param cache Cache instance.
-     * @param syncMode Write synchronization mode.
-     * @param op Update operation.
-     * @param keys Keys to update.
-     * @param vals Values or transform closure.
-     * @param invokeArgs Optional arguments for entry processor.
-     * @param conflictPutVals Conflict put values (optional).
-     * @param conflictRmvVals Conflict remove values (optional).
-     * @param retval Return value require flag.
-     * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
-     * @param expiryPlc Expiry policy explicitly specified for cache operation.
-     * @param filter Entry filter.
-     * @param subjId Subject ID.
-     * @param taskNameHash Task name hash code.
-     * @param skipStore Skip store flag.
+     *
      */
-    public GridNearAtomicUpdateFuture(
-        GridCacheContext cctx,
-        GridDhtAtomicCache cache,
-        CacheWriteSynchronizationMode syncMode,
-        GridCacheOperation op,
-        Collection<?> keys,
-        @Nullable Collection<?> vals,
-        @Nullable Object[] invokeArgs,
-        @Nullable Collection<GridCacheDrInfo> conflictPutVals,
-        @Nullable Collection<GridCacheVersion> conflictRmvVals,
-        final boolean retval,
-        final boolean rawRetval,
-        @Nullable ExpiryPolicy expiryPlc,
-        final CacheEntryPredicate[] filter,
-        UUID subjId,
-        int taskNameHash,
-        boolean skipStore,
-        int remapCnt,
-        boolean waitTopFut
-    ) {
-        this.rawRetval = rawRetval;
-
-        assert vals == null || vals.size() == keys.size();
-        assert conflictPutVals == null || conflictPutVals.size() == keys.size();
-        assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
-        assert subjId != null;
-
-        this.cctx = cctx;
-        this.cache = cache;
-        this.syncMode = syncMode;
-        this.op = op;
-        this.keys = keys;
-        this.vals = vals;
-        this.invokeArgs = invokeArgs;
-        this.conflictPutVals = conflictPutVals;
-        this.conflictRmvVals = conflictRmvVals;
-        this.retval = retval;
-        this.expiryPlc = expiryPlc;
-        this.filter = filter;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.skipStore = skipStore;
-        this.waitTopFut = waitTopFut;
-
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
-
-        mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
-
-        fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
-            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
-            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+    private class UpdateState {
+        /** */
+        private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
-        nearEnabled = CU.isNearEnabled(cctx);
+        /** */
+        private GridCacheVersion updVer;
 
-        if (!waitTopFut)
-            remapCnt = 1;
+        /** */
+        private AffinityTopologyVersion remapErrTopVer;
 
-        this.remapCnt = new AtomicInteger(remapCnt);
-    }
+        /** Mappings. */
+        @GridToStringInclude
+        private Map<UUID, GridNearAtomicUpdateRequest> mappings;
 
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futVer.asGridUuid();
-    }
+        /** Error. */
+        private CachePartialUpdateCheckedException err;
 
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return futVer;
-    }
+        /** Future ID. */
+        private GridCacheVersion futVer;
 
-    /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
-    }
+        /** Completion future for a particular topology version. */
+        private GridFutureAdapter<Void> topCompleteFut;
 
-    /**
-     * @return {@code True} if this future should block partition map exchange.
-     */
-    private boolean waitForPartitionExchange() {
-        // Wait fast-map near atomic update futures in CLOCK mode.
-        return fastMap;
-    }
+        /** */
+        private Collection<KeyCacheObject> remapKeys;
 
-    /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
+        /** */
+        private GridNearAtomicUpdateRequest singleReq;
 
-    /** {@inheritDoc} */
-    @Override public Collection<?> keys() {
-        return keys;
-    }
+        /**
+         * @param failedKeys Failed keys.
+         * @param topVer Topology version for failed update.
+         * @param err Error cause.
+         */
+        private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+            AffinityTopologyVersion topVer,
+            Throwable err) {
+            CachePartialUpdateCheckedException err0 = this.err;
 
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        Boolean single0 = single;
+            if (err0 == null)
+                err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
 
-        if (single0 != null && single0) {
-            if (singleNodeId.equals(nodeId)) {
-                onDone(addFailedKeys(
-                    singleReq.keys(),
-                    singleReq.topologyVersion(),
-                    new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)));
+            Collection<Object> keys = new ArrayList<>(failedKeys.size());
 
-                return true;
-            }
+            for (KeyCacheObject key : failedKeys)
+                keys.add(key.value(cctx.cacheObjectContext(), false));
 
-            return false;
+            err0.add(keys, err, topVer);
         }
 
-        GridNearAtomicUpdateRequest req = mappings.get(nodeId);
-
-        if (req != null) {
-            addFailedKeys(req.keys(),
-                req.topologyVersion(),
-                new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId));
-
-            mappings.remove(nodeId);
-
-            checkComplete();
-
-            return true;
+        synchronized IgniteUuid futureId() {
+            return futVer.asGridUuid();
         }
 
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // No-op.
-    }
-
-    /**
-     * Performs future mapping.
-     */
-    public void map() {
-        AffinityTopologyVersion topVer = null;
-
-        IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
-
-        if (tx != null && tx.topologyVersionSnapshot() != null)
-            topVer = tx.topologyVersionSnapshot();
-
-        if (topVer == null)
-            topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
-
-        if (topVer == null)
-            mapOnTopology(null, false, null);
-        else {
-            topLocked = true;
-
-            // Cannot remap.
-            remapCnt.set(1);
-
-            map0(topVer, null, false, null);
+        synchronized GridCacheVersion futureVersion() {
+            return futVer;
         }
-    }
 
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        if (waitForPartitionExchange() && topologyVersion().compareTo(topVer) < 0) {
-            GridFutureAdapter<Void> fut = null;
+        /**
+         * @param nodeId Left node ID.
+         */
+        void onNodeLeft(UUID nodeId) {
+            GridNearAtomicUpdateResponse res = null;
 
             synchronized (this) {
-                if (this.topVer == AffinityTopologyVersion.ZERO)
-                    return null;
+                GridNearAtomicUpdateRequest req;
+
+                if (singleReq != null)
+                    req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+                else
+                    req = mappings.get(nodeId);
 
-                if (this.topVer.compareTo(topVer) < 0) {
-                    if (topCompleteFut == null)
-                        topCompleteFut = new GridFutureAdapter<>();
+                if (req != null) {
+                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion());
 
-                    fut = topCompleteFut;
+                    res.addFailedKeys(req.keys(),
+                        new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId));
                 }
             }
 
-            if (fut != null && isDone())
-                fut.onDone();
-
-            return fut;
+            if (res != null)
+                onResult(nodeId, res);
         }
 
-        return null;
-    }
-
-    /**
-     * @param failed Keys to remap.
-     * @param errTopVer Topology version for failed update.
-     */
-    private void remap(Collection<?> failed, AffinityTopologyVersion errTopVer) {
-        assert errTopVer != null;
+        /**
+         * @param ret Result from single node.
+         */
+        @SuppressWarnings("unchecked")
+        private void addInvokeResults(GridCacheReturn ret) {
+            assert op == TRANSFORM : op;
+            assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+            if (ret.value() != null) {
+                if (opRes != null)
+                    opRes.mergeEntryProcessResults(ret);
+                else
+                    opRes = ret;
+            }
+        }
 
-        GridCacheVersion futVer0 = futVer;
+        /**
+         * @param nodeId Node ID.
+         * @param res Response.
+         */
+        void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+            GridNearAtomicUpdateRequest req;
 
-        if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null)
-            return;
+            AffinityTopologyVersion errTopVer = null;
 
-        Collection<Object> remapKeys = new ArrayList<>(failed.size());
-        Collection<Object> remapVals = vals != null ? new ArrayList<>(failed.size()) : null;
-        Collection<GridCacheDrInfo> remapConflictPutVals = conflictPutVals != null ? new ArrayList<GridCacheDrInfo>(failed.size()) : null;
-        Collection<GridCacheVersion> remapConflictRmvVals = conflictRmvVals != null ? new ArrayList<GridCacheVersion>(failed.size()) : null;
+            GridCacheReturn opRes0 = null;
+            CachePartialUpdateCheckedException err0 = null;
 
-        Iterator<?> keyIt = keys.iterator();
-        Iterator<?> valsIt = vals != null ? vals.iterator() : null;
-        Iterator<GridCacheDrInfo> conflictPutValsIt = conflictPutVals != null ? conflictPutVals.iterator() : null;
-        Iterator<GridCacheVersion> conflictRmvValsIt = conflictRmvVals != null ? conflictRmvVals.iterator() : null;
+            boolean rcvAll;
 
-        for (Object key : failed) {
-            while (keyIt.hasNext()) {
-                Object nextKey = keyIt.next();
-                Object nextVal = valsIt != null ? valsIt.next() : null;
-                GridCacheDrInfo nextConflictPutVal = conflictPutValsIt != null ? conflictPutValsIt.next() : null;
-                GridCacheVersion nextConflictRmvVal = conflictRmvValsIt != null ? conflictRmvValsIt.next() : null;
+            synchronized (this) {
+                if (!res.futureVersion().equals(futVer))
+                    return;
 
-                if (F.eq(key, nextKey)) {
-                    remapKeys.add(nextKey);
+                if (singleReq != null) {
+                    if (!singleReq.nodeId().equals(nodeId))
+                        return;
 
-                    if (remapVals != null)
-                        remapVals.add(nextVal);
+                    req = singleReq;
 
-                    if (remapConflictPutVals != null)
-                        remapConflictPutVals.add(nextConflictPutVal);
+                    rcvAll = true;
+                }
+                else {
+                    req = mappings.remove(nodeId);
 
-                    if (remapConflictRmvVals != null)
-                        remapConflictRmvVals.add(nextConflictRmvVal);
+                    if (req != null) {
+                        rcvAll = mappings.isEmpty();
 
-                    break;
+                        topVer = req.topologyVersion();
+                    }
+                    else
+                        return;
                 }
-            }
-        }
 
-        keys = remapKeys;
-        vals = remapVals;
-        conflictPutVals = remapConflictPutVals;
-        conflictRmvVals = remapConflictRmvVals;
+                if (res.remapKeys() != null) {
+                    assert !fastMap || cctx.kernalContext().clientNode();
+
+                    if (remapKeys == null)
+                        remapKeys = new ArrayList<>(res.remapKeys().size());
 
-        single = null;
-        futVer = null;
-        err = null;
-        opRes = null;
+                    remapKeys.addAll(res.remapKeys());
 
-        GridFutureAdapter<Void> fut0;
+                    if (remapErrTopVer == null || remapErrTopVer.compareTo(req.topologyVersion()) < 0)
+                        remapErrTopVer = req.topologyVersion();
+                }
+                else if (res.error() != null) {
+                    addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+                }
+                else {
+                    if (!req.fastMap() || req.hasPrimary()) {
+                        GridCacheReturn ret = res.returnValue();
 
-        synchronized (this) {
-            mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
+                        if (op == TRANSFORM) {
+                            if (ret != null)
+                                addInvokeResults(ret);
+                        }
+                        else
+                            opRes = ret;
+                    }
+                }
 
-            assert topVer != null && topVer.topologyVersion() > 0 : this;
+                if (rcvAll) {
+                    if (remapKeys != null) {
+                        assert remapErrTopVer != null;
 
-            topVer = AffinityTopologyVersion.ZERO;
+                        errTopVer = remapErrTopVer;
+                    }
+                    else {
+                        if (err != null &&
+                            X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+                            X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                            storeFuture() &&
+                            remapCnt.decrementAndGet() > 0) {
+                            ClusterTopologyCheckedException topErr =
+                                X.cause(err, ClusterTopologyCheckedException.class);
 
-            fut0 = topCompleteFut;
+                            if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+                                CachePartialUpdateCheckedException cause =
+                                    X.cause(err, CachePartialUpdateCheckedException.class);
 
-            topCompleteFut = null;
-        }
+                                assert cause != null && cause.topologyVersion() != null : err;
 
-        if (fut0 != null)
-            fut0.onDone();
+                                errTopVer = cause.topologyVersion();
 
-        singleNodeId = null;
-        singleReq = null;
-        fastMapRemap = false;
-        updVer = null;
-        topLocked = false;
+                                err = null;
 
-        IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1);
+                                Collection<Object> failedKeys = cause.failedKeys();
 
-        fut.listen(new CI1<IgniteInternalFuture<?>>() {
-            @Override public void apply(final IgniteInternalFuture<?> fut) {
-                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                    @Override public void run() {
-                        try {
-                            fut.get();
+                                for (Object key : failedKeys)
+                                    remapKeys.add(cctx.toCacheKeyObject(key));
 
-                            map();
-                        }
-                        catch (IgniteCheckedException e) {
-                            onDone(e);
+                                updVer = null;
+                            }
                         }
                     }
-                });
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
-        assert res == null || res instanceof GridCacheReturn;
-
-        GridCacheReturn ret = (GridCacheReturn)res;
 
-        Object retval =
-            res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success();
+                    if (errTopVer == null) {
+                        err0 = err;
+                        opRes0 = opRes;
+                    }
+                    else {
+                        cctx.mvcc().removeAtomicFuture(futVer);
 
-        if (op == TRANSFORM && retval == null)
-            retval = Collections.emptyMap();
+                        futVer = null;
+                        singleReq = null;
+                        topVer = AffinityTopologyVersion.ZERO;
+                    }
+                }
+            }
 
-        if (err != null && X.hasCause(err, CachePartialUpdateCheckedException.class) &&
-            X.hasCause(err, ClusterTopologyCheckedException.class) &&
-            storeFuture() &&
-            remapCnt.decrementAndGet() > 0) {
-            ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
+            updateNear(req, res);
 
-            if (!(topErr instanceof  ClusterTopologyServerNotFoundException)) {
-                CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
+            if (errTopVer != null) {
+                IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1);
 
-                assert cause != null && cause.topologyVersion() != null : err;
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(final IgniteInternalFuture<?> fut) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    fut.get();
 
-                remap(cause.failedKeys(), cause.topologyVersion());
+                                    mapOnTopology();
+                                }
+                                catch (IgniteCheckedException e) {
+                                    onDone(e);
+                                }
+                            }
+                        });
+                    }
+                });
 
-                return false;
+                return;
             }
-        }
-
-        if (super.onDone(retval, err)) {
-            if (futVer != null)
-                cctx.mvcc().removeAtomicFuture(version());
 
-            GridFutureAdapter<Void> fut0;
+            if (err0 != null)
+                onDone(err0);
+            else if (rcvAll)
+                onDone(opRes0);
+        }
 
+        /**
+         * @param req Request.
+         * @param e Error.
+         */
+        void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
             synchronized (this) {
-                fut0 = topCompleteFut;
-            }
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                    req.nodeId(),
+                    req.futureVersion());
 
-            if (fut0 != null)
-                fut0.onDone();
+                res.addFailedKeys(req.keys(), e);
 
-            return true;
+                onResult(req.nodeId(), res);
+            }
         }
 
-        return false;
-    }
+        /**
+         * @param topNodes Cache nodes.
+         * @return Mapping.
+         * @throws Exception If failed.
+         */
+        Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception {
+            Iterator<?> it = null;
 
-    /**
-     * Response callback.
-     *
-     * @param nodeId Node ID.
-     * @param res Update response.
-     */
-    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
-        if (res.remapKeys() != null) {
-            assert !fastMap || cctx.kernalContext().clientNode();
+            if (vals != null)
+                it = vals.iterator();
 
-            Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
+            Iterator<GridCacheDrInfo> conflictPutValsIt = null;
 
-            mapOnTopology(remapKeys, true, nodeId);
+            if (conflictPutVals != null)
+                conflictPutValsIt = conflictPutVals.iterator();
 
-            return;
-        }
+            Iterator<GridCacheVersion> conflictRmvValsIt = null;
 
-        GridCacheReturn ret = res.returnValue();
+            if (conflictRmvVals != null)
+                conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Boolean single0 = single;
+            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
 
-        if (single0 != null && single0) {
-            assert singleNodeId.equals(nodeId) : "Invalid response received for single-node mapped future " +
-                "[singleNodeId=" + singleNodeId + ", nodeId=" + nodeId + ", res=" + res + ']';
+            // Create mappings first, then send messages.
+            for (Object key : keys) {
+                if (key == null)
+                    throw new NullPointerException("Null key.");
 
-            updateNear(singleReq, res);
+                Object val;
+                GridCacheVersion conflictVer;
+                long conflictTtl;
+                long conflictExpireTime;
 
-            if (res.error() != null) {
-                onDone(res.failedKeys() != null ?
-                    addFailedKeys(res.failedKeys(), singleReq.topologyVersion(), res.error()) : res.error());
-            }
-            else {
-                if (op == TRANSFORM) {
-                    if (ret != null)
-                        addInvokeResults(ret);
+                if (vals != null) {
+                    val = it.next();
+                    conflictVer = null;
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
 
-                    onDone(opRes);
+                    if (val == null)
+                        throw new NullPointerException("Null value.");
                 }
-                else {
-                    GridCacheReturn opRes0 = opRes = ret;
+                else if (conflictPutVals != null) {
+                    GridCacheDrInfo conflictPutVal =  conflictPutValsIt.next();
 
-                    onDone(opRes0);
+                    val = conflictPutVal.value();
+                    conflictVer = conflictPutVal.version();
+                    conflictTtl =  conflictPutVal.ttl();
+                    conflictExpireTime = conflictPutVal.expireTime();
+                }
+                else if (conflictRmvVals != null) {
+                    val = null;
+                    conflictVer = conflictRmvValsIt.next();
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
                 }
-            }
-        }
-        else {
-            GridNearAtomicUpdateRequest req = mappings.get(nodeId);
-
-            if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
-                updateNear(req, res);
-
-                if (res.error() != null)
-                    addFailedKeys(req.keys(), req.topologyVersion(), res.error());
                 else {
-                    if (op == TRANSFORM) {
-                        assert !req.fastMap();
-
-                        if (ret != null)
-                            addInvokeResults(ret);
-                    }
-                    else if (req.fastMap() && req.hasPrimary())
-                        opRes = ret;
+                    val = null;
+                    conflictVer = null;
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
                 }
 
-                mappings.remove(nodeId);
-            }
-
-            checkComplete();
-        }
-    }
-
-    /**
-     * Updates near cache.
-     *
-     * @param req Update request.
-     * @param res Update response.
-     */
-    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
-        if (!nearEnabled || !req.hasPrimary())
-            return;
-
-        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
-        near.processNearAtomicUpdateResponse(req, res);
-    }
-
-    /**
-     * Maps future on ready topology.
-     *
-     * @param keys Keys to map.
-     * @param remap Boolean flag indicating if this is partial future remap.
-     * @param oldNodeId Old node ID if remap.
-     */
-    private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) {
-        cache.topology().readLock();
+                if (val == null && op != GridCacheOperation.DELETE)
+                    continue;
 
-        AffinityTopologyVersion topVer = null;
+                KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
-        try {
-            if (cache.topology().stopping()) {
-                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
-                    cache.name()));
+                if (remapKeys != null && !remapKeys.contains(cacheKey))
+                    continue;
 
-                return;
-            }
+                if (op != TRANSFORM)
+                    val = cctx.toCacheObject(val);
 
-            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
 
-            if (fut.isDone()) {
-                if (!fut.isCacheTopologyValid(cctx)) {
-                    onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
-                        cctx.name()));
+                if (affNodes.isEmpty())
+                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                        "(all partition nodes left the grid).");
 
-                    return;
-                }
+                int i = 0;
 
-                topVer = fut.topologyVersion();
-            }
-            else {
-                if (waitTopFut) {
-                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    mapOnTopology(keys, remap, oldNodeId);
-                                }
-                            });
-                        }
-                    });
-                }
-                else
-                    onDone(new GridCacheTryPutFailedException());
+                for (ClusterNode affNode : affNodes) {
+                    if (affNode == null)
+                        throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                            "(all partition nodes left the grid).");
 
-                return;
-            }
-        }
-        finally {
-            cache.topology().readUnlock();
-        }
+                    UUID nodeId = affNode.id();
 
-        map0(topVer, keys, remap, oldNodeId);
-    }
+                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
 
-    /**
-     * Checks if future is ready to be completed.
-     */
-    private void checkComplete() {
-        boolean remap = false;
+                    if (mapped == null) {
+                        mapped = new GridNearAtomicUpdateRequest(
+                            cctx.cacheId(),
+                            nodeId,
+                            futVer,
+                            fastMap,
+                            updVer,
+                            topVer,
+                            topLocked,
+                            syncMode,
+                            op,
+                            retval,
+                            expiryPlc,
+                            invokeArgs,
+                            filter,
+                            subjId,
+                            taskNameHash,
+                            skipStore,
+                            cctx.kernalContext().clientNode());
 
-        synchronized (this) {
-            if (topVer != AffinityTopologyVersion.ZERO &&
-                ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty())) {
-                CachePartialUpdateCheckedException err0 = err;
+                        pendingMappings.put(nodeId, mapped);
+                    }
 
-                if (err0 != null)
-                    onDone(err0);
-                else {
-                    if (fastMapRemap) {
-                        assert cctx.kernalContext().clientNode();
+                    mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
 
-                        remap = true;
-                    }
-                    else
-                        onDone(opRes);
+                    i++;
                 }
             }
-        }
-
-        if (remap)
-            mapOnTopology(null, true, null);
-    }
 
-    /**
-     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
-     */
-    private boolean storeFuture() {
-        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @param remapKeys Keys to remap or {@code null} to map all keys.
-     * @param remap Flag indicating if this is partial remap for this future.
-     * @param oldNodeId Old node ID if was remap.
-     */
-    private void map0(
-        AffinityTopologyVersion topVer,
-        @Nullable Collection<?> remapKeys,
-        boolean remap,
-        @Nullable UUID oldNodeId) {
-        assert oldNodeId == null || remap || fastMapRemap;
-
-        Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
-        if (F.isEmpty(topNodes)) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
-                "left the grid)."));
-
-            return;
+            return pendingMappings;
         }
 
-        if (futVer == null)
-            // Assign future version in topology read lock before first exception may be thrown.
-            futVer = cctx.versions().next(topVer);
-
-        if (!remap && storeFuture())
-            cctx.mvcc().addAtomicFuture(version(), this);
-
-        CacheConfiguration ccfg = cctx.config();
-
-        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-        if (updVer == null)
-            updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
-
-        if (updVer != null && log.isDebugEnabled())
-            log.debug("Assigned fast-map version for update on near node: " + updVer);
-
-        if (keys.size() == 1 && !fastMap && (single == null || single)) {
-            assert remapKeys == null || remapKeys.size() == 1 : remapKeys;
-
+        /**
+         * @return Request.
+         * @throws Exception If failed.
+         */
+        GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
             Object key = F.first(keys);
 
             Object val;
@@ -773,21 +560,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
 
             // We still can get here if user pass map with single element.
-            if (key == null) {
-                NullPointerException err = new NullPointerException("Null key.");
-
-                onDone(err);
+            if (key == null)
+                throw new NullPointerException("Null key.");
 
-                return;
-            }
-
-            if (val == null && op != GridCacheOperation.DELETE) {
-                NullPointerException err = new NullPointerException("Null value.");
-
-                onDone(err);
-
-                return;
-            }
+            if (val == null && op != GridCacheOperation.DELETE)
+                throw new NullPointerException("Null value.");
 
             KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
@@ -796,12 +573,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
 
-            if (primary == null) {
-                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
-                    "left the grid)."));
-
-                return;
-            }
+            if (primary == null)
+                throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                    "left the grid).");
 
             GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
                 cctx.cacheId(),
@@ -829,184 +603,409 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 conflictVer,
                 true);
 
-            synchronized (this) {
-                this.topVer = topVer;
+            return req;
+        }
+
+        /**
+         * @param topVer Topology version.
+         */
+        void map(AffinityTopologyVersion topVer) {
+            Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+            if (F.isEmpty(topNodes)) {
+                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+                    "left the grid)."));
+
+                return;
+            }
+
+            Exception err = null;
+            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
+
+            int size = keys().size();
+
+            synchronized (this) {
+                this.topVer = topVer;
+
+                futVer = cctx.versions().next(topVer);
+
+                if (storeFuture())
+                    cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
+
+                try {
+                    if (size == 1 && !fastMap) {
+                        assert remapKeys == null || remapKeys.size() == 1;
+
+                        singleReq = mapSingleUpdate();
+                    }
+                    else {
+                        pendingMappings = mapUpdate(topNodes);
+
+                        if (pendingMappings.size() == 1)
+                            singleReq = F.firstValue(pendingMappings);
+                        else {
+                            if (syncMode == PRIMARY_SYNC) {
+                                mappings = U.newHashMap(pendingMappings.size());
+
+                                for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+                                    if (req.hasPrimary())
+                                        mappings.put(req.nodeId(), req);
+                                }
+                            }
+                            else
+                                mappings = pendingMappings;
+
+                            assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
+                        }
+                    }
+
+                    remapKeys = null;
+                }
+                catch (Exception e) {
+                    err = e;
+                }
+            }
+
+            if (err != null) {
+                onDone(err);
+
+                return;
+            }
+
+            // Optimize mapping for single key.
+            if (singleReq != null)
+                mapSingle(singleReq.nodeId(), singleReq);
+            else {
+                assert pendingMappings != null;
+
+                if (size == 0)
+                    onDone(new GridCacheReturn(cctx, true, null, true));
+                else
+                    doUpdate(pendingMappings);
+            }
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @return Future.
+         */
+        @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+            if (this.topVer == AffinityTopologyVersion.ZERO)
+                return null;
+
+            if (this.topVer.compareTo(topVer) < 0) {
+                if (topCompleteFut == null)
+                    topCompleteFut = new GridFutureAdapter<>();
+
+                return topCompleteFut;
+            }
+
+            return null;
+        }
+
+        /**
+         * @return Future version.
+         */
+        GridCacheVersion onFutureDone() {
+            GridCacheVersion ver0;
+
+            GridFutureAdapter<Void> fut0;
+
+            synchronized (this) {
+                fut0 = topCompleteFut;
+
+                ver0 = futVer;
+
+                futVer = null;
+            }
+
+            if (fut0 != null)
+                fut0.onDone();
+
+            return ver0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized  String toString() {
+            return S.toString(UpdateState.class, this);
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param cache Cache instance.
+     * @param syncMode Write synchronization mode.
+     * @param op Update operation.
+     * @param keys Keys to update.
+     * @param vals Values or transform closure.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @param conflictPutVals Conflict put values (optional).
+     * @param conflictRmvVals Conflict remove values (optional).
+     * @param retval Return value require flag.
+     * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+     * @param expiryPlc Expiry policy explicitly specified for cache operation.
+     * @param filter Entry filter.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip store flag.
+     */
+    public GridNearAtomicUpdateFuture(
+        GridCacheContext cctx,
+        GridDhtAtomicCache cache,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        Collection<?> keys,
+        @Nullable Collection<?> vals,
+        @Nullable Object[] invokeArgs,
+        @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+        @Nullable Collection<GridCacheVersion> conflictRmvVals,
+        final boolean retval,
+        final boolean rawRetval,
+        @Nullable ExpiryPolicy expiryPlc,
+        final CacheEntryPredicate[] filter,
+        UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        int remapCnt,
+        boolean waitTopFut
+    ) {
+        this.rawRetval = rawRetval;
+
+        assert vals == null || vals.size() == keys.size();
+        assert conflictPutVals == null || conflictPutVals.size() == keys.size();
+        assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+        assert subjId != null;
+
+        this.cctx = cctx;
+        this.cache = cache;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.keys = keys;
+        this.vals = vals;
+        this.invokeArgs = invokeArgs;
+        this.conflictPutVals = conflictPutVals;
+        this.conflictRmvVals = conflictRmvVals;
+        this.retval = retval;
+        this.expiryPlc = expiryPlc;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.waitTopFut = waitTopFut;
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+        fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+        nearEnabled = CU.isNearEnabled(cctx);
 
-                single = true;
-            }
+        if (!waitTopFut)
+            remapCnt = 1;
 
-            // Optimize mapping for single key.
-            mapSingle(primary.id(), req);
+        this.remapCnt = new AtomicInteger(remapCnt);
 
-            return;
-        }
+        state = new UpdateState();
+    }
 
-        Iterator<?> it = null;
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return state.futureId();
+    }
 
-        if (vals != null)
-            it = vals.iterator();
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return state.futureVersion();
+    }
 
-        Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+    /** {@inheritDoc} */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        throw new UnsupportedOperationException();
+    }
 
-        if (conflictPutVals != null)
-            conflictPutValsIt = conflictPutVals.iterator();
+    /**
+     * @return {@code True} if this future should block partition map exchange.
+     */
+    private boolean waitForPartitionExchange() {
+        // Wait fast-map near atomic update futures in CLOCK mode.
+        return fastMap;
+    }
 
-        Iterator<GridCacheVersion> conflictRmvValsIt = null;
+    /** {@inheritDoc} */
+    @Override public Collection<?> keys() {
+        return keys;
+    }
 
-        if (conflictRmvVals != null)
-            conflictRmvValsIt = conflictRmvVals.iterator();
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        state.onNodeLeft(nodeId);
 
-        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+        return false;
+    }
 
-        // Must do this in synchronized block because we need to atomically remove and add mapping.
-        // Otherwise checkComplete() may see empty intermediate state.
-        synchronized (this) {
-            if (oldNodeId != null)
-                removeMapping(oldNodeId);
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
 
-            // For fastMap mode wait for all responses before remapping.
-            if (remap && fastMap && !mappings.isEmpty()) {
-                fastMapRemap = true;
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
 
-                return;
-            }
+    /**
+     * Performs future mapping.
+     */
+    public void map() {
+        AffinityTopologyVersion topVer = null;
 
-            // Create mappings first, then send messages.
-            for (Object key : keys) {
-                if (key == null) {
-                    NullPointerException err = new NullPointerException("Null key.");
+        IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
 
-                    onDone(err);
+        if (tx != null && tx.topologyVersionSnapshot() != null)
+            topVer = tx.topologyVersionSnapshot();
 
-                    return;
-                }
+        if (topVer == null)
+            topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
-                Object val;
-                GridCacheVersion conflictVer;
-                long conflictTtl;
-                long conflictExpireTime;
+        if (topVer == null)
+            mapOnTopology();
+        else {
+            topLocked = true;
 
-                if (vals != null) {
-                    val = it.next();
-                    conflictVer = null;
-                    conflictTtl = CU.TTL_NOT_CHANGED;
-                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+            // Cannot remap.
+            remapCnt.set(1);
 
-                    if (val == null) {
-                        NullPointerException err = new NullPointerException("Null value.");
+            state.map(topVer);
+        }
+    }
 
-                        onDone(err);
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForPartitionExchange()) {
+            GridFutureAdapter<Void> fut = state.completeFuture(topVer);
 
-                        return;
-                    }
-                }
-                else if (conflictPutVals != null) {
-                    GridCacheDrInfo conflictPutVal =  conflictPutValsIt.next();
+            if (fut != null && isDone()) {
+                fut.onDone();
 
-                    val = conflictPutVal.value();
-                    conflictVer = conflictPutVal.version();
-                    conflictTtl =  conflictPutVal.ttl();
-                    conflictExpireTime = conflictPutVal.expireTime();
-                }
-                else if (conflictRmvVals != null) {
-                    val = null;
-                    conflictVer = conflictRmvValsIt.next();
-                    conflictTtl = CU.TTL_NOT_CHANGED;
-                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-                }
-                else {
-                    val = null;
-                    conflictVer = null;
-                    conflictTtl = CU.TTL_NOT_CHANGED;
-                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-                }
+                return null;
+            }
 
-                if (val == null && op != GridCacheOperation.DELETE)
-                    continue;
+            return fut;
+        }
 
-                KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+        return null;
+    }
 
-                if (remapKeys != null && !remapKeys.contains(cacheKey))
-                    continue;
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+        assert res == null || res instanceof GridCacheReturn;
 
-                if (op != TRANSFORM)
-                    val = cctx.toCacheObject(val);
+        GridCacheReturn ret = (GridCacheReturn)res;
 
-                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+        Object retval =
+            res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success();
 
-                if (affNodes.isEmpty()) {
-                    onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                        "(all partition nodes left the grid)."));
+        if (op == TRANSFORM && retval == null)
+            retval = Collections.emptyMap();
 
-                    return;
-                }
+        if (super.onDone(retval, err)) {
+            GridCacheVersion futVer = state.onFutureDone();
 
-                int i = 0;
+            if (futVer != null)
+                cctx.mvcc().removeAtomicFuture(futVer);
 
-                for (ClusterNode affNode : affNodes) {
-                    if (affNode == null) {
-                        onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                            "(all partition nodes left the grid)."));
+            return true;
+        }
 
-                        return;
-                    }
+        return false;
+    }
 
-                    UUID nodeId = affNode.id();
+    /**
+     * Response callback.
+     *
+     * @param nodeId Node ID.
+     * @param res Update response.
+     */
+    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+        state.onResult(nodeId, res);
+    }
 
-                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+    /**
+     * Updates near cache.
+     *
+     * @param req Update request.
+     * @param res Update response.
+     */
+    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+        if (!nearEnabled || !req.hasPrimary())
+            return;
 
-                    if (mapped == null) {
-                        mapped = new GridNearAtomicUpdateRequest(
-                            cctx.cacheId(),
-                            nodeId,
-                            futVer,
-                            fastMap,
-                            updVer,
-                            topVer,
-                            topLocked,
-                            syncMode,
-                            op,
-                            retval,
-                            expiryPlc,
-                            invokeArgs,
-                            filter,
-                            subjId,
-                            taskNameHash,
-                            skipStore,
-                            cctx.kernalContext().clientNode());
+        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
 
-                        pendingMappings.put(nodeId, mapped);
+        near.processNearAtomicUpdateResponse(req, res);
+    }
 
-                        GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
+    /**
+     * Maps future on ready topology.
+     */
+    private void mapOnTopology() {
+        cache.topology().readLock();
 
-                        assert old == null || (old != null && remap) :
-                            "Invalid mapping state [old=" + old + ", remap=" + remap + ']';
-                    }
+        AffinityTopologyVersion topVer = null;
 
-                    mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+        try {
+            if (cache.topology().stopping()) {
+                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                    cache.name()));
 
-                    i++;
-                }
+                return;
             }
 
-            this.topVer = topVer;
-
-            fastMapRemap = false;
-        }
+            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-        if ((single == null || single) && pendingMappings.size() == 1) {
-            Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
+            if (fut.isDone()) {
+                if (!fut.isCacheTopologyValid(cctx)) {
+                    onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+                        cctx.name()));
 
-            single = true;
+                    return;
+                }
 
-            mapSingle(entry.getKey(), entry.getValue());
+                topVer = fut.topologyVersion();
+            }
+            else {
+                if (waitTopFut) {
+                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    mapOnTopology();
+                                }
+                            });
+                        }
+                    });
+                }
+                else
+                    onDone(new GridCacheTryPutFailedException());
 
-            return;
+                return;
+            }
+        }
+        finally {
+            cache.topology().readUnlock();
         }
-        else
-            single = false;
 
-        doUpdate(pendingMappings);
+        state.map(topVer);
+    }
+
+    /**
+     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+     */
+    private boolean storeFuture() {
+        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
     }
 
     /**
@@ -1038,16 +1037,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param req Request.
      */
     private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
-        singleNodeId = nodeId;
-        singleReq = req;
-
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
                 new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req,
-                        GridNearAtomicUpdateResponse res) {
-                        assert res.futureVersion().equals(futVer) : futVer;
-
+                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
                         onResult(res.nodeId(), res);
                     }
                 });
@@ -1059,11 +1052,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                 cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
-                if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
+                if (syncMode == FULL_ASYNC)
                     onDone(new GridCacheReturn(cctx, true, null, true));
             }
             catch (IgniteCheckedException e) {
-                onDone(addFailedKeys(req.keys(), req.topologyVersion(), e));
+                state.onSendError(req, e);
             }
         }
     }
@@ -1094,82 +1087,22 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
-                    addFailedKeys(req.keys(), req.topologyVersion(), e);
-
-                    removeMapping(req.nodeId());
+                    state.onSendError(req, e);
                 }
-
-                if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
-                    removeMapping(req.nodeId());
             }
         }
 
-        if (syncMode == FULL_ASYNC)
-            // In FULL_ASYNC mode always return (null, true).
-            opRes = new GridCacheReturn(cctx, true, null, true);
-
         if (locUpdate != null) {
             cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
                 new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req,
-                        GridNearAtomicUpdateResponse res) {
-                        assert res.futureVersion().equals(futVer) : futVer;
-
+                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
                         onResult(res.nodeId(), res);
                     }
                 });
         }
 
-        checkComplete();
-    }
-
-    /**
-     * Removes mapping from future mappings map.
-     *
-     * @param nodeId Node ID to remove mapping for.
-     */
-    private void removeMapping(UUID nodeId) {
-        mappings.remove(nodeId);
-    }
-
-    /**
-     * @param ret Result from single node.
-     */
-    @SuppressWarnings("unchecked")
-    private synchronized void addInvokeResults(GridCacheReturn ret) {
-        assert op == TRANSFORM : op;
-        assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-        if (ret.value() != null) {
-            if (opRes != null)
-                opRes.mergeEntryProcessResults(ret);
-            else
-                opRes = ret;
-        }
-    }
-
-    /**
-     * @param failedKeys Failed keys.
-     * @param topVer Topology version for failed update.
-     * @param err Error cause.
-     * @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}.
-     */
-    private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys,
-        AffinityTopologyVersion topVer,
-        Throwable err) {
-        CachePartialUpdateCheckedException err0 = this.err;
-
-        if (err0 == null)
-            err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
-
-        Collection<Object> keys = new ArrayList<>(failedKeys.size());
-
-        for (KeyCacheObject key : failedKeys)
-            keys.add(key.value(cctx.cacheObjectContext(), false));
-
-        err0.add(keys, err, topVer);
-
-        return err0;
+        if (syncMode == FULL_ASYNC)
+            onDone(new GridCacheReturn(cctx, true, null, true));
     }
 
     /** {@inheritDoc} */


Mime
View raw message