ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [32/40] ignite git commit: ignite-4705 Atomic cache protocol change: notify client node from backups
Date Tue, 14 Mar 2017 15:00:26 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index deb9ce4..2826215 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -18,26 +18,65 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.io.Externalizable;
+import java.nio.ByteBuffer;
 import java.util.UUID;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+    /** Skip store flag bit mask. */
+    private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
+
+    /** Keep binary flag. */
+    private static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02;
+
+    /** Near cache key flag. */
+    private static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04;
+
+    /** */
+    static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08;
+
+    /** */
+    private static final int DHT_ATOMIC_REPLY_WITHOUT_DELAY = 0x10;
+
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
+    /** Future ID on primary. */
+    protected long futId;
+
+    /** Write version. */
+    protected GridCacheVersion writeVer;
+
+    /** Write synchronization mode. */
+    protected CacheWriteSynchronizationMode syncMode;
+
+    /** Topology version. */
+    protected AffinityTopologyVersion topVer;
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
     /** Node ID. */
     @GridDirectTransient
     protected UUID nodeId;
@@ -46,6 +85,15 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     @GridDirectTransient
     private boolean onRes;
 
+    /** */
+    private UUID nearNodeId;
+
+    /** */
+    private long nearFutId;
+
+    /** Additional flags. */
+    protected byte flags;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -59,9 +107,68 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
      */
-    protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) {
+    protected GridDhtAtomicAbstractUpdateRequest(int cacheId,
+        UUID nodeId,
+        long futId,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        UUID subjId,
+        int taskNameHash,
+        boolean addDepInfo,
+        boolean keepBinary,
+        boolean skipStore
+    ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
+        this.futId = futId;
+        this.writeVer = writeVer;
+        this.syncMode = syncMode;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.addDepInfo = addDepInfo;
+
+        if (skipStore)
+            setFlag(true, DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
+        if (keepBinary)
+            setFlag(true, DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
+    }
+
+    void nearReplyInfo(UUID nearNodeId, long nearFutId) {
+        assert nearNodeId != null;
+
+        this.nearNodeId = nearNodeId;
+        this.nearFutId = nearFutId;
+    }
+
+    boolean replyWithoutDelay() {
+        return isFlag(DHT_ATOMIC_REPLY_WITHOUT_DELAY);
+    }
+
+    void replyWithoutDelay(boolean replyWithoutDelay) {
+        setFlag(replyWithoutDelay, DHT_ATOMIC_REPLY_WITHOUT_DELAY);
+    }
+
+    /**
+     * @param res Result flag.
+     */
+    void hasResult(boolean res) {
+        setFlag(res, DHT_ATOMIC_HAS_RESULT_MASK);
+    }
+
+    /**
+     * @return Result flag.
+     */
+    private boolean hasResult() {
+        return isFlag(DHT_ATOMIC_HAS_RESULT_MASK);
+    }
+
+    /**
+     * @return Near node ID.
+     */
+    public UUID nearNodeId() {
+        return nearNodeId;
     }
 
     /** {@inheritDoc} */
@@ -77,14 +184,25 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     }
 
     /**
+     * @return Flags.
+     */
+    public final byte flags() {
+        return flags;
+    }
+
+    /**
      * @return Keep binary flag.
      */
-    public abstract boolean keepBinary();
+    public final boolean keepBinary() {
+        return isFlag(DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
+    }
 
     /**
      * @return Skip write-through to a persistent storage.
      */
-    public abstract boolean skipStore();
+    public final boolean skipStore() {
+        return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
+    }
 
     /**
      * @return {@code True} if on response flag changed.
@@ -93,6 +211,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         return !onRes && (onRes = true);
     }
 
+    /**
+     * @return {@code True} if response was received.
+     */
+    boolean hasResponse() {
+        return onRes;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean addDeploymentInfo() {
         return addDepInfo;
@@ -121,7 +246,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
      * @param addPrevVal If {@code true} adds previous value.
-     * @param partId Partition.
      * @param prevVal Previous value.
      * @param updateCntr Update counter.
      */
@@ -132,7 +256,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         long conflictExpireTime,
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
-        int partId,
         @Nullable CacheObject prevVal,
         long updateCntr
     );
@@ -158,27 +281,44 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /**
      * @return Subject ID.
      */
-    public abstract UUID subjectId();
+    public final UUID subjectId() {
+        return subjId;
+    }
 
     /**
      * @return Task name.
      */
-    public abstract int taskNameHash();
+    public final int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /**
+     * @return Future ID on primary node.
+     */
+    public final long futureId() {
+        return futId;
+    }
 
     /**
-     * @return Version assigned on primary node.
+     * @return Future ID on near node.
      */
-    public abstract GridCacheVersion futureVersion();
+    public final long nearFutureId() {
+        return nearFutId;
+    }
 
     /**
      * @return Write version.
      */
-    public abstract GridCacheVersion writeVersion();
+    public final GridCacheVersion writeVersion() {
+        return writeVer;
+    }
 
     /**
      * @return Cache write synchronization mode.
      */
-    public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+    public final CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
 
     /**
      * @return Keys size.
@@ -203,12 +343,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     public abstract KeyCacheObject key(int idx);
 
     /**
-     * @param idx Partition index.
-     * @return Partition id.
-     */
-    public abstract int partitionId(int idx);
-
-    /**
      * @param updCntr Update counter.
      * @return Update counter.
      */
@@ -284,4 +418,228 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
      * @return Optional arguments for entry processor.
      */
     @Nullable public abstract Object[] invokeArguments();
+
+    /**
+     * @return {@code True} if near cache update request.
+     */
+    protected final boolean near() {
+        return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK);
+    }
+
+    /**
+     * @param near Near cache update flag.
+     */
+    protected final void near(boolean near) {
+        setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK);
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    protected final void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    final boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 12;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeLong("nearFutId", nearFutId))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("writeVer", writeVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                nearFutId = reader.readLong("nearFutId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                nearNodeId = reader.readUuid("nearNodeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 9:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                writeVer = reader.readMessage("writeVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtAtomicAbstractUpdateRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        StringBuilder flags = new StringBuilder();
+
+        if (skipStore())
+            appendFlag(flags, "skipStore");
+        if (keepBinary())
+            appendFlag(flags, "keepBinary");
+        if (near())
+            appendFlag(flags, "near");
+        if (hasResult())
+            appendFlag(flags, "hasRes");
+        if (replyWithoutDelay())
+            appendFlag(flags, "resNoDelay");
+
+        return S.toString(GridDhtAtomicAbstractUpdateRequest.class, this,
+            "flags", flags.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cebf4ae..c20ed48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
-import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -80,6 +80,8 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -90,8 +92,6 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -99,16 +99,14 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -134,12 +132,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT =
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
+    /** */
+    private final ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>> defRes =
+        new ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>>() {
+            @Override protected Map<UUID, GridDhtAtomicDeferredUpdateResponse> initialValue() {
+                return new HashMap<>();
+            }
+        };
+
     /** Update reply closure. */
     @GridToStringExclude
-    private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
-
-    /** Pending */
-    private GridDeferredAckMessageSender deferredUpdateMsgSnd;
+    private UpdateReplyClosure updateReplyClos;
 
     /** */
     private GridNearAtomicCache<K, V> near;
@@ -205,25 +208,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override protected void init() {
         super.init();
 
-        updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+        updateReplyClos = new UpdateReplyClosure() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-                if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
-                    assert req.writeSynchronizationMode() != FULL_ASYNC : req;
-
-                    // Always send reply in CLOCK ordering mode.
-                    sendNearUpdateReply(res.nodeId(), res);
-
-                    return;
-                }
-
-                // Request should be for primary keys only in PRIMARY ordering mode.
-                assert req.hasPrimary() : req;
-
                 if (req.writeSynchronizationMode() != FULL_ASYNC)
                     sendNearUpdateReply(res.nodeId(), res);
                 else {
-                    if (!F.isEmpty(res.remapKeys()))
+                    if (res.remapTopologyVersion() != null)
                         // Remap keys on primary node in FULL_ASYNC mode.
                         remapToNewPrimary(req);
                     else if (res.error() != null) {
@@ -240,53 +231,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
-            @Override public int getTimeout() {
-                return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
-            }
-
-            @Override public int getBufferSize() {
-                return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
-            }
-
-            @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
-                GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
-                    vers, ctx.deploymentEnabled());
-
-                try {
-                    ctx.kernalContext().gateway().readLock();
-
-                    try {
-                        ctx.io().send(nodeId, msg, ctx.ioPolicy());
-
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
-                                ", node=" + nodeId + ']');
-                        }
-                    }
-                    finally {
-                        ctx.kernalContext().gateway().readUnlock();
-                    }
-                }
-                catch (IllegalStateException ignored) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
-                            "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
-                    }
-                }
-                catch (ClusterTopologyCheckedException ignored) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Failed to send deferred DHT update response, node left [" +
-                            "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send deferred DHT update response to remote node [" +
-                        "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
-                }
-            }
-        };
-
         CacheMetricsImpl m = new CacheMetricsImpl(ctx);
 
         if (ctx.dht().near() != null)
@@ -419,6 +363,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
+        ctx.io().addHandler(ctx.cacheId(),
+            GridDhtAtomicNearResponse.class,
+            new CI2<UUID, GridDhtAtomicNearResponse>() {
+            @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) {
+                processDhtAtomicNearResponse(uuid, msg);
+            }
+
+            @Override public String toString() {
+                return "GridDhtAtomicNearResponse handler " +
+                    "[msgIdx=" + GridDhtAtomicNearResponse.CACHE_MSG_IDX + ']';
+            }
+        });
+
+        ctx.io().addHandler(ctx.cacheId(),
+            GridNearAtomicCheckUpdateRequest.class,
+            new CI2<UUID, GridNearAtomicCheckUpdateRequest>() {
+                @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) {
+                    processCheckUpdateRequest(uuid, msg);
+                }
+
+                @Override public String toString() {
+                    return "GridNearAtomicCheckUpdateRequest handler " +
+                        "[msgIdx=" + GridNearAtomicCheckUpdateRequest.CACHE_MSG_IDX + ']';
+                }
+            });
+
         if (near == null) {
             ctx.io().addHandler(
                 ctx.cacheId(),
@@ -450,11 +420,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void stop() {
-        deferredUpdateMsgSnd.stop();
-    }
-
     /**
      * @param near Near cache.
      */
@@ -1341,9 +1306,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         CacheEntryPredicate[] filters = CU.filterArray(filter);
 
-        if (conflictPutVal == null &&
-            conflictRmvVer == null &&
-            !isFastMap(filters, op)) {
+        if (conflictPutVal == null && conflictRmvVer == null) {
             return new GridNearAtomicSingleUpdateFuture(
                 ctx,
                 this,
@@ -1389,19 +1352,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
-     * Whether this is fast-map operation.
-     *
-     * @param filters Filters.
-     * @param op Operation.
-     * @return {@code True} if fast-map.
-     */
-    public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) {
-        return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
-            ctx.config().getAtomicWriteOrderMode() == CLOCK &&
-            !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
-    }
-
-    /**
      * Entry point for all public API remove methods.
      *
      * @param keys Keys to remove.
@@ -1696,10 +1646,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param req Update request.
      * @param completionCb Completion callback.
      */
-    public void updateAllAsyncInternal(
+    void updateAllAsyncInternal(
         final UUID nodeId,
         final GridNearAtomicAbstractUpdateRequest req,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        final UpdateReplyClosure completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
 
@@ -1748,12 +1698,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void onForceKeysError(final UUID nodeId,
         final GridNearAtomicAbstractUpdateRequest req,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        final UpdateReplyClosure completionCb,
         IgniteCheckedException e
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             nodeId,
-            req.futureVersion(),
+            req.futureId(),
+            req.partition(),
+            false,
             ctx.deploymentEnabled());
 
         res.addFailedKeys(req.keys(), e);
@@ -1771,12 +1723,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private void updateAllAsyncInternal0(
         UUID nodeId,
         GridNearAtomicAbstractUpdateRequest req,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        UpdateReplyClosure completionCb
     ) {
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
-            ctx.deploymentEnabled());
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null) {
+            U.warn(msgLog, "Skip near update request, node originated update request left [" +
+                "futId=" + req.futureId() + ", node=" + nodeId + ']');
 
-        res.partition(req.partition());
+            return;
+        }
+
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+            nodeId,
+            req.futureId(),
+            req.partition(),
+            false,
+            ctx.deploymentEnabled());
 
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
@@ -1791,7 +1754,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         try {
             // If batch store update is enabled, we need to lock all entries.
             // First, need to acquire locks on cache entries, then check filter.
-            List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion());
+            List<GridDhtCacheEntry> locked = null;
 
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
@@ -1810,43 +1773,29 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         return;
                     }
 
-                    // Do not check topology version for CLOCK versioning since
-                    // partition exchange will wait for near update future (if future is on server node).
-                    // Also do not check topology version if topology was locked on near node by
+                    // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
-                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
-                        ClusterNode node = ctx.discovery().node(nodeId);
-
-                        if (node == null) {
-                            U.warn(msgLog, "Skip near update request, node originated update request left [" +
-                                "futId=" + req.futureVersion() + ", node=" + nodeId + ']');
-
-                            return;
-                        }
+                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                        locked = lockEntries(req, req.topologyVersion());
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
-                        GridCacheVersion ver = req.updateVersion();
+                        // Assign next version for update inside entries lock.
+                        GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
 
-                        if (ver == null) {
-                            // Assign next version for update inside entries lock.
-                            ver = ctx.versions().next(top.topologyVersion());
+                        if (hasNear)
+                            res.nearVersion(ver);
 
-                            if (hasNear)
-                                res.nearVersion(ver);
-
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("Assigned update version [futId=" + req.futureVersion() +
-                                    ", writeVer=" + ver + ']');
-                            }
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("Assigned update version [futId=" + req.futureId() +
+                                ", writeVer=" + ver + ']');
                         }
 
                         assert ver != null : "Got null version for update request: " + req;
 
                         boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
 
-                        dhtFut = createDhtFuture(ver, req, res, completionCb, false);
+                        dhtFut = createDhtFuture(ver, req);
 
                         expiry = expiryPolicy(req.expiry());
 
@@ -1866,7 +1815,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 locked,
                                 ver,
                                 dhtFut,
-                                completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
                                 expiry,
@@ -1886,7 +1834,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 locked,
                                 ver,
                                 dhtFut,
-                                completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
                                 expiry,
@@ -1902,15 +1849,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         res.returnValue(retVal);
 
-                        if (req.writeSynchronizationMode() != FULL_ASYNC)
-                            req.cleanup(!node.isLocal());
-
                         if (dhtFut != null)
-                            ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+                            ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
                     }
-                    else
+                    else {
                         // Should remap all keys.
                         remap = true;
+
+                        res.remapTopologyVersion(top.topologyVersion());
+                    }
                 }
                 finally {
                     top.readUnlock();
@@ -1936,12 +1883,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         catch (GridDhtInvalidPartitionException ignore) {
-            assert !req.fastMap() || req.clientRequest() : req;
-
             if (log.isDebugEnabled())
                 log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
 
             remap = true;
+
+            res.remapTopologyVersion(ctx.topology().topologyVersion());
         }
         catch (Throwable e) {
             // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
@@ -1961,18 +1908,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (remap) {
             assert dhtFut == null;
 
-            res.remapKeys(req.keys());
-
             completionCb.apply(req, res);
         }
-        else {
-            // If there are backups, map backup update future.
+        else
             if (dhtFut != null)
-                dhtFut.map();
-                // Otherwise, complete the call.
-            else
-                completionCb.apply(req, res);
-        }
+                dhtFut.map(node, res.returnValue(), res, completionCb);
+
+        if (req.writeSynchronizationMode() != FULL_ASYNC)
+            req.cleanup(!node.isLocal());
 
         sendTtlUpdateRequest(expiry);
     }
@@ -1987,7 +1930,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param locked Locked entries.
      * @param ver Assigned version.
      * @param dhtFut Optional DHT future.
-     * @param completionCb Completion callback to invoke when DHT future is completed.
      * @param replicate Whether replication is enabled.
      * @param taskName Task name.
      * @param expiry Expiry policy.
@@ -2004,7 +1946,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final List<GridDhtCacheEntry> locked,
         final GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         final boolean replicate,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
@@ -2049,9 +1990,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         for (int i = 0; i < locked.size(); i++) {
             GridDhtCacheEntry entry = locked.get(i);
 
-            if (entry == null)
-                continue;
-
             try {
                 if (!checkFilter(entry, req, res)) {
                     if (expiry != null && entry.hasValue()) {
@@ -2155,7 +2093,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 null,
                                 entryProcessorMap,
                                 dhtFut,
-                                completionCb,
                                 req,
                                 res,
                                 replicate,
@@ -2204,7 +2141,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 rmvKeys,
                                 entryProcessorMap,
                                 dhtFut,
-                                completionCb,
                                 req,
                                 res,
                                 replicate,
@@ -2331,7 +2267,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 rmvKeys,
                 entryProcessorMap,
                 dhtFut,
-                completionCb,
                 req,
                 res,
                 replicate,
@@ -2404,14 +2339,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * Updates locked entries one-by-one.
      *
-     * @param node Originating node.
+     * @param nearNode Originating node.
      * @param hasNear {@code True} if originating node has near cache.
      * @param req Update request.
      * @param res Update response.
      * @param locked Locked entries.
      * @param ver Assigned update version.
      * @param dhtFut Optional DHT future.
-     * @param completionCb Completion callback to invoke when DHT future is completed.
      * @param replicate Whether DR is enabled for that cache.
      * @param taskName Task name.
      * @param expiry Expiry policy.
@@ -2420,14 +2354,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
     private UpdateSingleResult updateSingle(
-        ClusterNode node,
+        ClusterNode nearNode,
         boolean hasNear,
         GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2440,10 +2373,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
 
-        boolean readersOnly = false;
-
         boolean intercept = ctx.config().getInterceptor() != null;
 
+        AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+
         // Avoid iterator creation.
         for (int i = 0; i < req.size(); i++) {
             KeyCacheObject k = req.key(i);
@@ -2455,18 +2388,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             try {
                 GridDhtCacheEntry entry = locked.get(i);
 
-                if (entry == null)
-                    continue;
-
                 GridCacheVersion newConflictVer = req.conflictVersion(i);
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
 
                 assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
 
-                boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
-                    req.topologyVersion());
-
                 Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
 
                 Collection<UUID> readers = null;
@@ -2474,46 +2401,39 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 if (checkReaders) {
                     readers = entry.readers();
-                    filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+                    filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
                 }
 
                 GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                     ver,
-                    node.id(),
+                    nearNode.id(),
                     locNodeId,
                     op,
                     writeVal,
                     req.invokeArguments(),
-                    (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
-                        && writeThrough() && !req.skipStore(),
+                    writeThrough() && !req.skipStore(),
                     !req.skipStore(),
                     sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
-                    true,
-                    true,
-                    primary,
-                    ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+                    /*event*/true,
+                    /*metrics*/true,
+                    /*primary*/true,
+                    /*verCheck*/false,
                     topVer,
                     req.filter(),
-                    replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+                    replicate ? DR_PRIMARY : DR_NONE,
                     newConflictTtl,
                     newConflictExpireTime,
                     newConflictVer,
-                    true,
+                    /*conflictResolve*/true,
                     intercept,
                     req.subjectId(),
                     taskName,
-                    null,
-                    null,
+                    /*prevVal*/null,
+                    /*updateCntr*/null,
                     dhtFut);
 
-                if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                    dhtFut = createDhtFuture(ver, req, res, completionCb, true);
-
-                    readersOnly = true;
-                }
-
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2525,20 +2445,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         EntryProcessor<Object, Object, Object> entryProcessor = null;
 
-                        if (!readersOnly) {
-                            dhtFut.addWriteEntry(entry,
-                                updRes.newValue(),
-                                entryProcessor,
-                                updRes.newTtl(),
-                                updRes.conflictExpireTime(),
-                                newConflictVer,
-                                sndPrevVal,
-                                updRes.oldValue(),
-                                updRes.updateCounter());
-                        }
+                        dhtFut.addWriteEntry(
+                            affAssignment,
+                            entry,
+                            updRes.newValue(),
+                            entryProcessor,
+                            updRes.newTtl(),
+                            updRes.conflictExpireTime(),
+                            newConflictVer,
+                            sndPrevVal,
+                            updRes.oldValue(),
+                            updRes.updateCounter());
 
                         if (!F.isEmpty(filteredReaders))
-                            dhtFut.addNearWriteEntries(filteredReaders,
+                            dhtFut.addNearWriteEntries(
+                                filteredReaders,
                                 entry,
                                 updRes.newValue(),
                                 entryProcessor,
@@ -2553,8 +2474,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (hasNear) {
-                    if (primary && updRes.sendToDht()) {
-                        if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
+                    if (updRes.sendToDht()) {
+                        if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                             // If put the same value as in request then do not need to send it back.
                             if (op == TRANSFORM || writeVal != updRes.newValue()) {
                                 res.addNearValue(i,
@@ -2566,13 +2487,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
 
                             if (updRes.newValue() != null) {
-                                IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+                                IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
 
                                 assert f == null : f;
                             }
                         }
-                        else if (F.contains(readers, node.id())) // Reader became primary or backup.
-                            entry.removeReader(node.id(), req.messageId());
+                        else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
+                            entry.removeReader(nearNode.id(), req.messageId());
                         else
                             res.addSkippedIndex(i);
                     }
@@ -2594,7 +2515,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
                         if (retVal == null)
-                            retVal = new GridCacheReturn(node.isLocal());
+                            retVal = new GridCacheReturn(nearNode.isLocal());
 
                         retVal.addEntryProcessResult(ctx,
                             k,
@@ -2610,7 +2531,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         CacheObject ret = updRes.oldValue();
 
                         retVal = new GridCacheReturn(ctx,
-                            node.isLocal(),
+                            nearNode.isLocal(),
                             req.keepBinary(),
                             req.returnValue() ? ret : null,
                             updRes.success());
@@ -2630,13 +2551,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param firstEntryIdx Index of the first entry in the request keys collection.
      * @param entries Entries to update.
      * @param ver Version to set.
-     * @param node Originating node.
+     * @param nearNode Originating node.
      * @param writeVals Write values.
      * @param putMap Values to put.
      * @param rmvKeys Keys to remove.
      * @param entryProcessorMap Entry processors.
      * @param dhtFut DHT update future if has backups.
-     * @param completionCb Completion callback to invoke when DHT future is completed.
      * @param req Request.
      * @param res Response.
      * @param replicate Whether replication is enabled.
@@ -2652,13 +2572,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final int firstEntryIdx,
         final List<GridDhtCacheEntry> entries,
         final GridCacheVersion ver,
-        final ClusterNode node,
+        final ClusterNode nearNode,
         @Nullable final List<CacheObject> writeVals,
         @Nullable final Map<KeyCacheObject, CacheObject> putMap,
         @Nullable final Collection<KeyCacheObject> rmvKeys,
         @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         final boolean replicate,
@@ -2681,17 +2600,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             GridCacheOperation op;
 
             if (putMap != null) {
-                // If fast mapping, filter primary keys for write to store.
-                Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
-                    F.view(putMap, new P1<CacheObject>() {
-                        @Override public boolean apply(CacheObject key) {
-                            return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
-                        }
-                    }) :
-                    putMap;
-
                 try {
-                    ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+                    ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
                         @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
                             return F.t(v, ver);
                         }
@@ -2704,17 +2614,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 op = UPDATE;
             }
             else {
-                // If fast mapping, filter primary keys for write to store.
-                Collection<KeyCacheObject> storeKeys = req.fastMap() ?
-                    F.view(rmvKeys, new P1<Object>() {
-                        @Override public boolean apply(Object key) {
-                            return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
-                        }
-                    }) :
-                    rmvKeys;
-
                 try {
-                    ctx.store().removeAll(null, storeKeys);
+                    ctx.store().removeAll(null, rmvKeys);
                 }
                 catch (CacheStorePartialUpdateException e) {
                     storeErr = e;
@@ -2725,6 +2626,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
+            AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -2747,21 +2650,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     assert writeVal != null || op == DELETE : "null write value found.";
 
-                    boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(),
-                        entry.partition(),
-                        req.topologyVersion());
-
                     Collection<UUID> readers = null;
                     Collection<UUID> filteredReaders = null;
 
                     if (checkReaders) {
                         readers = entry.readers();
-                        filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+                        filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
                     }
 
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
-                        node.id(),
+                        nearNode.id(),
                         locNodeId,
                         op,
                         writeVal,
@@ -2773,11 +2672,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         expiry,
                         /*event*/true,
                         /*metrics*/true,
-                        primary,
-                        ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+                        /*primary*/true,
+                        /*verCheck*/false,
                         topVer,
                         null,
-                        replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+                        replicate ? DR_PRIMARY : DR_NONE,
                         CU.TTL_NOT_CHANGED,
                         CU.EXPIRE_TIME_CALCULATE,
                         null,
@@ -2811,30 +2710,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     batchRes.addDeleted(entry, updRes, entries);
 
-                    if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                        dhtFut = createDhtFuture(ver, req, res, completionCb, true);
-
-                        batchRes.readersOnly(true);
-                    }
-
                     if (dhtFut != null) {
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
-                        if (!batchRes.readersOnly()) {
-                            dhtFut.addWriteEntry(entry,
-                                writeVal,
-                                entryProcessor,
-                                updRes.newTtl(),
-                                CU.EXPIRE_TIME_CALCULATE,
-                                null,
-                                sndPrevVal,
-                                updRes.oldValue(),
-                                updRes.updateCounter());
-                        }
+                        dhtFut.addWriteEntry(
+                            affAssignment,
+                            entry,
+                            writeVal,
+                            entryProcessor,
+                            updRes.newTtl(),
+                            CU.EXPIRE_TIME_CALCULATE,
+                            null,
+                            sndPrevVal,
+                            updRes.oldValue(),
+                            updRes.updateCounter());
 
                         if (!F.isEmpty(filteredReaders))
-                            dhtFut.addNearWriteEntries(filteredReaders,
+                            dhtFut.addNearWriteEntries(
+                                filteredReaders,
                                 entry,
                                 writeVal,
                                 entryProcessor,
@@ -2843,30 +2737,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (hasNear) {
-                        if (primary) {
-                            if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
-                                int idx = firstEntryIdx + i;
-
-                                if (req.operation() == TRANSFORM) {
-                                    res.addNearValue(idx,
-                                        writeVal,
-                                        updRes.newTtl(),
-                                        CU.EXPIRE_TIME_CALCULATE);
-                                }
-                                else
-                                    res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+                        if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
+                            int idx = firstEntryIdx + i;
 
-                                if (writeVal != null || entry.hasValue()) {
-                                    IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
-
-                                    assert f == null : f;
-                                }
+                            if (req.operation() == TRANSFORM) {
+                                res.addNearValue(idx,
+                                    writeVal,
+                                    updRes.newTtl(),
+                                    CU.EXPIRE_TIME_CALCULATE);
                             }
-                            else if (readers.contains(node.id())) // Reader became primary or backup.
-                                entry.removeReader(node.id(), req.messageId());
                             else
-                                res.addSkippedIndex(firstEntryIdx + i);
+                                res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+
+                            if (writeVal != null || entry.hasValue()) {
+                                IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+
+                                assert f == null : f;
+                            }
                         }
+                        else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+                            entry.removeReader(nearNode.id(), req.messageId());
                         else
                             res.addSkippedIndex(firstEntryIdx + i);
                     }
@@ -2879,7 +2769,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         catch (IgniteCheckedException e) {
-            res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e, ctx);
+            res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);
         }
 
         if (storeErr != null) {
@@ -2888,7 +2778,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             for (Object failedKey : storeErr.failedKeys())
                 failed.add(ctx.toCacheKeyObject(failedKey));
 
-            res.addFailedKeys(failed, storeErr.getCause(), ctx);
+            res.addFailedKeys(failed, storeErr.getCause());
         }
 
         return dhtFut;
@@ -2910,23 +2800,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             KeyCacheObject key = req.key(0);
 
             while (true) {
-                try {
-                    GridDhtCacheEntry entry = entryExx(key, topVer);
+                GridDhtCacheEntry entry = entryExx(key, topVer);
 
-                    GridUnsafe.monitorEnter(entry);
+                GridUnsafe.monitorEnter(entry);
 
-                    if (entry.obsolete())
-                        GridUnsafe.monitorExit(entry);
-                    else
-                        return Collections.singletonList(entry);
-                }
-                catch (GridDhtInvalidPartitionException e) {
-                    // Ignore invalid partition exception in CLOCK ordering mode.
-                    if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
-                        return Collections.singletonList(null);
-                    else
-                        throw e;
-                }
+                if (entry.obsolete())
+                    GridUnsafe.monitorExit(entry);
+                else
+                    return Collections.singletonList(entry);
             }
         }
         else {
@@ -2934,18 +2815,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             while (true) {
                 for (int i = 0; i < req.size(); i++) {
-                    try {
-                        GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+                    GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
 
-                        locked.add(entry);
-                    }
-                    catch (GridDhtInvalidPartitionException e) {
-                        // Ignore invalid partition exception in CLOCK ordering mode.
-                        if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
-                            locked.add(null);
-                        else
-                            throw e;
-                    }
+                    locked.add(entry);
                 }
 
                 boolean retry = false;
@@ -3055,7 +2927,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * @param req Request to remap.
      */
-    private void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) {
+    void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) {
         assert req.writeSynchronizationMode() == FULL_ASYNC : req;
 
         if (log.isDebugEnabled())
@@ -3098,7 +2970,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             drPutVals = null;
         }
 
-        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
+        GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
             this,
             ctx.config().getWriteSynchronizationMode(),
@@ -3127,43 +2999,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *
      * @param writeVer Write version.
      * @param updateReq Update request.
-     * @param updateRes Update response.
-     * @param completionCb Completion callback to invoke when future is completed.
-     * @param force If {@code true} then creates future without optimizations checks.
-     * @return Backup update future or {@code null} if there are no backups.
+     * @return Backup update future.
      */
-    @Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
+    private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq,
-        GridNearAtomicUpdateResponse updateRes,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
-        boolean force
+        GridNearAtomicAbstractUpdateRequest updateReq
     ) {
-        if (!force) {
-            if (updateReq.fastMap())
-                return null;
-
-            AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
-            Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(ctx.cacheId(), topVer);
-
-            // We are on primary node for some key.
-            assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
-                ctx.kernalContext().discovery().discoCache(topVer) + ']';
-
-            if (nodes.size() == 1) {
-                if (log.isDebugEnabled())
-                    log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
-                        "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
-
-                return null;
-            }
-        }
-
         if (updateReq.size() == 1)
-            return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+            return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
         else
-            return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
     }
 
     /**
@@ -3172,13 +3017,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (msgLog.isDebugEnabled()) {
-            msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() +
-                ", writeVer=" + req.updateVersion() +
+            msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
                 ", node=" + nodeId + ']');
         }
 
-        req.nodeId(ctx.localNodeId());
-
         updateAllAsyncInternal(nodeId, req, updateReplyClos);
     }
 
@@ -3189,20 +3031,41 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (msgLog.isDebugEnabled())
-            msgLog.debug("Received near atomic update response " +
-                "[futId=" + res.futureVersion() + ", node=" + nodeId + ']');
+            msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
 
         res.nodeId(ctx.localNodeId());
 
         GridNearAtomicAbstractUpdateFuture fut =
-            (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+            (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
         if (fut != null)
-            fut.onResult(nodeId, res, false);
-
+            fut.onPrimaryResponse(nodeId, res, false);
         else
             U.warn(msgLog, "Failed to find near update future for update response (will ignore) " +
-                "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']');
+                "[futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']');
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param checkReq Request.
+     */
+    private void processCheckUpdateRequest(UUID nodeId, GridNearAtomicCheckUpdateRequest checkReq) {
+        /*
+         * Message is processed in the same stripe, so primary already processed update request. It is possible
+         * response was not sent if operation result was empty. Near node will get original response or this one.
+         */
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+            nodeId,
+            checkReq.futureId(),
+            checkReq.partition(),
+            false,
+            false);
+
+        GridCacheReturn ret = new GridCacheReturn(false, true);
+
+        res.returnValue(ret);
+
+        sendNearUpdateReply(nodeId, res);
     }
 
     /**
@@ -3210,20 +3073,28 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param req Dht atomic update request.
      */
     private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) {
+        assert Thread.currentThread().getName().startsWith("sys-stripe-") : Thread.currentThread().getName();
+
         if (msgLog.isDebugEnabled()) {
-            msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() +
+            msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() +
                 ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
         }
 
+        assert req.partition() >= 0 : req;
+
         GridCacheVersion ver = req.writeVersion();
 
-        // Always send update reply.
-        GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(),
-            ctx.deploymentEnabled());
+        GridDhtAtomicNearResponse nearRes = null;
 
-        res.partition(req.partition());
+        if (req.nearNodeId() != null) {
+            nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+                req.partition(),
+                req.nearFutureId(),
+                nodeId,
+                req.flags());
+        }
 
-        Boolean replicate = ctx.isDrEnabled();
+        boolean replicate = ctx.isDrEnabled();
 
         boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;
 
@@ -3305,48 +3176,208 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 // Ignore.
             }
             catch (IgniteCheckedException e) {
-                res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+                IgniteCheckedException err =
+                    new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+
+                if (nearRes != null)
+                    nearRes.addFailedKey(key, err);
+
+                U.error(log, "Failed to update key on backup node: " + key, e);
+            }
+        }
+
+        GridDhtAtomicUpdateResponse dhtRes = null;
+
+        if (isNearEnabled(cacheCfg)) {
+            List<KeyCacheObject> nearEvicted =
+                ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
+
+            if (nearEvicted != null) {
+                dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+                    req.partition(),
+                    req.futureId(),
+                    ctx.deploymentEnabled());
+
+                dhtRes.nearEvicted(nearEvicted);
             }
         }
 
-        if (isNearEnabled(cacheCfg))
-            ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
+        if (nearRes != null)
+            sendDhtNearResponse(req, nearRes);
 
+        if (dhtRes == null && req.replyWithoutDelay()) {
+            dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+                req.partition(),
+                req.futureId(),
+                ctx.deploymentEnabled());
+        }
+
+        if (dhtRes != null)
+            sendDhtPrimaryResponse(nodeId, req, dhtRes);
+        else
+            sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
+    }
+
+    /**
+     * @param nodeId Primary node ID.
+     * @param req Request.
+     * @param dhtRes Response to send.
+     */
+    private void sendDhtPrimaryResponse(UUID nodeId,
+        GridDhtAtomicAbstractUpdateRequest req,
+        GridDhtAtomicUpdateResponse dhtRes) {
         try {
-            if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) {
-                ctx.io().send(nodeId, res, ctx.ioPolicy());
+            ctx.io().send(nodeId, dhtRes, ctx.ioPolicy());
 
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() +
-                        ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
-                }
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Sent DHT response [futId=" + req.futureId() +
+                    ", nearFutId=" + req.nearFutureId() +
+                    ", writeVer=" + req.writeVersion() +
+                    ", node=" + nodeId + ']');
             }
-            else {
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() +
-                        ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
-                }
+        }
+        catch (ClusterTopologyCheckedException ignored) {
+            U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() +
+                ", nearFutId=" + req.nearFutureId() +
+                ", node=" + nodeId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+                ", nearFutId=" + req.nearFutureId() +
+                ", node=" + nodeId +
+                ", res=" + dhtRes + ']', e);
+        }
+    }
+
+    /**
+     * @param part Partition.
+     * @param primaryId Primary ID.
+     * @param futId Future ID.
+     */
+    private void sendDeferredUpdateResponse(int part, UUID primaryId, long futId) {
+        Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+        GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+        if (msg == null) {
+            msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+                new GridLongList(DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE));
+
+            if (DEFERRED_UPDATE_RESPONSE_TIMEOUT > 0) {
+                GridTimeoutObject timeoutSnd = new DeferredUpdateTimeout(part, primaryId);
+
+                msg.timeoutSender(timeoutSnd);
+
+                ctx.time().addTimeoutObject(timeoutSnd);
+            }
+
+            resMap.put(primaryId, msg);
+        }
+
+        GridLongList futIds = msg.futureIds();
+
+        assert futIds.size() < DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE : futIds.size();
+
+        futIds.add(futId);
+
+        if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
+            resMap.remove(primaryId);
+
+            sendDeferredUpdateResponse(primaryId, msg);
+        }
+    }
+
+    /**
+     * @param primaryId Primary ID.
+     * @param msg Message.
+     */
+    private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) {
+        try {
+            GridTimeoutObject timeoutSnd = msg.timeoutSender();
+
+            if (timeoutSnd != null)
+                ctx.time().removeTimeoutObject(timeoutSnd);
+
+            ctx.io().send(primaryId, msg, ctx.ioPolicy());
+
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() +
+                    ", node=" + primaryId + ']');
+            }
+        }
+        catch (ClusterTopologyCheckedException ignored) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Failed to send deferred DHT update response, node left [" +
+                    "futIds=" + msg.futureIds() + ", node=" + primaryId + ']');
+            }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send deferred DHT update response to remote node [" +
+                "futIds=" + msg.futureIds() + ", node=" + primaryId + ']', e);
+        }
+    }
+
+    /**
+     * @param req Request.
+     * @param nearRes Response to send.
+     */
+    private void sendDhtNearResponse(final GridDhtAtomicAbstractUpdateRequest req, GridDhtAtomicNearResponse nearRes) {
+        try {
+            ClusterNode node = ctx.discovery().node(req.nearNodeId());
+
+            if (node == null)
+                throw new ClusterTopologyCheckedException("Node failed: " + req.nearNodeId());
 
-                // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
-                sendDeferredUpdateResponse(nodeId, req.futureVersion());
+            if (node.isLocal())
+                processDhtAtomicNearResponse(node.id(), nearRes);
+            else
+                ctx.io().send(node, nearRes, ctx.ioPolicy());
+
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Sent DHT near response [futId=" + req.futureId() +
+                    ", nearFutId=" + req.nearFutureId() +
+                    ", writeVer=" + req.writeVersion() +
+                    ", node=" + req.nearNodeId() + ']');
             }
         }
         catch (ClusterTopologyCheckedException ignored) {
-            U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureVersion() +
-                ", node=" + req.nodeId() + ']');
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Failed to send DHT near response, node left [futId=" + req.futureId() +
+                    ", nearFutId=" + req.nearFutureId() +
+                    ", node=" + req.nearNodeId() + ']');
+            }
         }
         catch (IgniteCheckedException e) {
-            U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureVersion() +
-                ", node=" + nodeId +  ", res=" + res + ']', e);
+            U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+                ", nearFutId=" + req.nearFutureId() +
+                ", node=" + req.nearNodeId() +
+                ", res=" + nearRes + ']', e);
         }
     }
 
     /**
-     * @param nodeId Node ID to send message to.
-     * @param ver Version to ack.
+     * @param nodeId Node ID.
+     * @param res Response.
      */
-    private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
-        deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, ver);
+    private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+        GridNearAtomicAbstractUpdateFuture updateFut =
+            (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
+
+        if (updateFut != null) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Received DHT atomic near response [futId=" + res.futureId() +
+                    ", node=" + nodeId + ']');
+            }
+
+            updateFut.onDhtResponse(nodeId, res);
+        }
+        else {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Failed to find future for DHT atomic near response [futId=" + res.futureId() +
+                    ", node=" + nodeId +
+                    ", res=" + res + ']');
+            }
+        }
     }
 
     /**
@@ -3355,18 +3386,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
-        GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+        GridDhtAtomicAbstractUpdateFuture updateFut =
+            (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
         if (updateFut != null) {
             if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Received DHT atomic update response [futId=" + res.futureVersion() +
-                    ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']');
+                msgLog.debug("Received DHT atomic update response [futId=" + res.futureId() +
+                        ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']');
             }
 
-            updateFut.onResult(nodeId, res);
+            updateFut.onDhtResponse(nodeId, res);
         }
         else {
-            U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureVersion() +
+            U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() +
                 ", node=" + nodeId + ", res=" + res + ']');
         }
     }
@@ -3377,19 +3409,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
-        for (GridCacheVersion ver : res.futureVersions()) {
-            GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(ver);
+        GridLongList futIds = res.futureIds();
+
+        assert futIds != null && futIds.size() > 0 : futIds;
+
+        for (int i = 0; i < futIds.size(); i++) {
+            Long id = futIds.get(i);
+
+            GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(id);
 
             if (updateFut != null) {
                 if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Received DHT atomic deferred update response [futId=" + ver +
+                    msgLog.debug("Received DHT atomic deferred update response [futId=" + id +
                         ", writeVer=" + res + ", node=" + nodeId + ']');
                 }
 
-                updateFut.onResult(nodeId);
+                updateFut.onDeferredResponse(nodeId);
             }
             else {
-                U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + ver +
+                U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + id +
                     ", nodeId=" + nodeId + ", res=" + res + ']');
             }
         }
@@ -3404,16 +3442,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             ctx.io().send(nodeId, res, ctx.ioPolicy());
 
             if (msgLog.isDebugEnabled())
-                msgLog.debug("Sent near update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']');
+                msgLog.debug("Sent near update response [futId=" + res.futureId() + ", node=" + nodeId + ']');
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (msgLog.isDebugEnabled()) {
-                msgLog.debug("Failed to send near update response [futId=" + res.futureVersion() +
+                msgLog.debug("Failed to send near update response [futId=" + res.futureId() +
                     ", node=" + nodeId + ']');
             }
         }
         catch (IgniteCheckedException e) {
-            U.error(msgLog, "Failed to send near update response [futId=" + res.futureVersion() +
+            U.error(msgLog, "Failed to send near update response [futId=" + res.futureId() +
                 ", node=" + nodeId + ", res=" + res + ']', e);
         }
     }
@@ -3482,9 +3520,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private GridDhtAtomicAbstractUpdateFuture dhtFut;
 
         /** */
-        private boolean readersOnly;
-
-        /** */
         private GridCacheReturn invokeRes;
 
         /**
@@ -3537,20 +3572,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
             this.dhtFut = dhtFut;
         }
-
-        /**
-         * @return {@code True} if only readers (not backups) should be updated.
-         */
-        private boolean readersOnly() {
-            return readersOnly;
-        }
-
-        /**
-         * @param readersOnly {@code True} if only readers (not backups) should be updated.
-         */
-        private void readersOnly(boolean readersOnly) {
-            this.readersOnly = readersOnly;
-        }
     }
 
     /**
@@ -3569,4 +3590,71 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             return Collections.emptyList();
         }
     }
+
+    /**
+     *
+     */
+    interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
+        /** */
+        private final int part;
+
+        /** */
+        private final UUID primaryId;
+
+        /** */
+        private final IgniteUuid id;
+
+        /** */
+        private final long endTime;
+
+        /**
+         * @param part Partition.
+         * @param primaryId Primary ID.
+         */
+        DeferredUpdateTimeout(int part, UUID primaryId) {
+            this.part = part;
+            this.primaryId = primaryId;
+
+            endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+
+            id = IgniteUuid.fromUuid(primaryId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+            GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+            if (msg != null && msg.timeoutSender() == this) {
+                msg.timeoutSender(null);
+
+                resMap.remove(primaryId);
+
+                sendDeferredUpdateResponse(primaryId, msg);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            ctx.kernalContext().getStripedExecutorService().execute(part, this);
+        }
+    }
 }


Mime
View raw message