ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-4705
Date Tue, 21 Feb 2017 13:01:54 GMT
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ba085859
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ba085859
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ba085859

Branch: refs/heads/ignite-4705
Commit: ba0858595298f88de8eeecd048cfe85e0c330a8f
Parents: 431d510
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Feb 20 18:25:30 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Feb 21 15:56:40 2017 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |  28 +-
 .../connection/GridClientNioTcpConnection.java  |   2 +-
 .../processors/cache/GridCacheProcessor.java    |   1 -
 .../dht/atomic/GridDhtAtomicCache.java          | 290 ++++++++++----
 .../GridDhtAtomicDeferredUpdateResponse.java    |  52 ++-
 .../dht/atomic/GridDhtAtomicNearResponse.java   |  34 +-
 .../GridNearAtomicAbstractUpdateFuture.java     | 173 ++++++++-
 .../GridNearAtomicAbstractUpdateRequest.java    |   3 +
 .../GridNearAtomicSingleUpdateFuture.java       | 122 ++----
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 389 +++++++++----------
 .../internal/util/ipc/IpcToNioAdapter.java      |   4 +-
 .../nio/GridConnectionBytesVerifyFilter.java    |   7 +-
 .../util/nio/GridNioAsyncNotifyFilter.java      |   7 +-
 .../internal/util/nio/GridNioCodecFilter.java   |   9 +-
 .../util/nio/GridNioEmbeddedFuture.java         |   7 +
 .../ignite/internal/util/nio/GridNioFilter.java |  10 +-
 .../internal/util/nio/GridNioFilterAdapter.java |   7 +-
 .../internal/util/nio/GridNioFilterChain.java   |  15 +-
 .../util/nio/GridNioFinishedFuture.java         |   5 -
 .../ignite/internal/util/nio/GridNioFuture.java |   7 -
 .../internal/util/nio/GridNioFutureImpl.java    |  18 +-
 .../ignite/internal/util/nio/GridNioServer.java |  58 ++-
 .../internal/util/nio/GridNioSession.java       |   5 +-
 .../internal/util/nio/GridNioSessionImpl.java   |   8 +-
 .../util/nio/GridNioSessionMetaKey.java         |   5 +-
 .../util/nio/GridShmemCommunicationClient.java  |   6 +-
 .../util/nio/GridTcpNioCommunicationClient.java |  13 +-
 .../internal/util/nio/SessionWriteRequest.java  |   7 -
 .../internal/util/nio/ssl/GridNioSslFilter.java |  12 +-
 .../util/nio/ssl/GridNioSslHandler.java         |  24 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   2 +-
 .../nio/GridNioEmbeddedFutureSelfTest.java      |   2 +-
 .../util/future/nio/GridNioFutureSelfTest.java  |  23 +-
 .../nio/impl/GridNioFilterChainSelfTest.java    |  12 +-
 34 files changed, 818 insertions(+), 549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 50876dd..537d495 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -183,21 +183,21 @@ public class MessageCodeGenerator {
 
 //        gen.generateAll(true);
 
-        gen.generateAndWrite(UpdateErrors.class);
-        gen.generateAndWrite(GridDhtAtomicNearResponse.class);
-        //gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
-        gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
-        gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
         gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
-        gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
-        gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
-        gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
-        gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
-        gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
-        gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
-        gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
-        gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
-        gen.generateAndWrite(GridNearAtomicMappingResponse.class);
+//        gen.generateAndWrite(UpdateErrors.class);
+//        gen.generateAndWrite(GridDhtAtomicNearResponse.class);
+//        gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
+//        gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
+//        gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
+//        gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
+//        gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
+//        gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
+//        gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
+//        gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
+//        gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
+//        gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
+//        gen.generateAndWrite(GridNearAtomicMappingResponse.class);
+        //gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
 
 //        gen.generateAndWrite(GridMessageCollection.class);
 //        gen.generateAndWrite(DataStreamerEntry.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 8937504..d3a30fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -229,7 +229,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
             GridNioFuture<?> sslHandshakeFut = null;
 
             if (sslCtx != null) {
-                sslHandshakeFut = new GridNioFutureImpl<>();
+                sslHandshakeFut = new GridNioFutureImpl<>(null);
 
                 meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 17f9c2a..7e166e4 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -856,7 +856,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         if (!ctx.clientNode() && !ctx.isDaemon())
             addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d3b0f5d..30d58cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
-import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -81,6 +80,8 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -100,10 +101,10 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
@@ -137,15 +138,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
     /** */
-    static final boolean IGNITE_ATOMIC_SND_MAPPING_TO_NEAR = IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_SND_MAPPING_TO_NEAR", false);
+    static final boolean IGNITE_ATOMIC_SND_MAPPING_TO_NEAR =
+        IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_SND_MAPPING_TO_NEAR", false);
+
+    /** */
+    private static final boolean IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK =
+        IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK", false);
+
+    /** */
+    private final ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>> defRes =
+        new ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>>() {
+            @Override protected Map<UUID, GridDhtAtomicDeferredUpdateResponse> initialValue() {
+                return new HashMap<>();
+            }
+        };
 
     /** Update reply closure. */
     @GridToStringExclude
     private UpdateReplyClosure updateReplyClos;
 
-    /** Pending */
-    private GridDeferredAckMessageSender deferredUpdateMsgSnd;
-
     /** */
     private GridNearAtomicCache<K, V> near;
 
@@ -236,54 +247,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         // TODO IGNITE-4705.
         log.info("Atomic cache start [name=" + name() +
             ", mode=" + configuration().getWriteSynchronizationMode() +
-            ", IGNITE_ATOMIC_SND_MAPPING_TO_NEAR=" + IGNITE_ATOMIC_SND_MAPPING_TO_NEAR + ']');
-
-        deferredUpdateMsgSnd = new GridDeferredAckMessageSender<Long>(ctx.time(), ctx.closures()) {
-            @Override public int getTimeout() {
-                return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
-            }
-
-            @Override public int getBufferSize() {
-                return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
-            }
-
-            @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<Long> vers) {
-                GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
-                    vers, ctx.deploymentEnabled());
-
-                try {
-                    ctx.kernalContext().gateway().readLock();
-
-                    try {
-                        ctx.io().send(nodeId, msg, ctx.ioPolicy());
-
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() +
-                                ", node=" + nodeId + ']');
-                        }
-                    }
-                    finally {
-                        ctx.kernalContext().gateway().readUnlock();
-                    }
-                }
-                catch (IllegalStateException ignored) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
-                            "futIds=" + msg.futureIds() + ", node=" + nodeId + ']');
-                    }
-                }
-                catch (ClusterTopologyCheckedException ignored) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Failed to send deferred DHT update response, node left [" +
-                            "futIds=" + msg.futureIds() + ", node=" + nodeId + ']');
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send deferred DHT update response to remote node [" +
-                        "futIds=" + msg.futureIds() + ", node=" + nodeId + ']', e);
-                }
-            }
-        };
+            ", IGNITE_ATOMIC_SND_MAPPING_TO_NEAR=" + IGNITE_ATOMIC_SND_MAPPING_TO_NEAR +
+            ", IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK=" + IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK + ']');
 
         CacheMetricsImpl m = new CacheMetricsImpl(ctx);
 
@@ -476,7 +441,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public void stop() {
-        deferredUpdateMsgSnd.stop();
+        // TODO 4705: need send deferred response?
     }
 
     /**
@@ -3220,7 +3185,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
         if (fut != null)
-            fut.onResult(nodeId, res, false);
+            fut.onPrimaryResponse(nodeId, res, false);
 
         else
             U.warn(msgLog, "Failed to find near update future for update response (will ignore) " +
@@ -3243,6 +3208,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             new GridDhtAtomicNearResponse(ctx.cacheId(),
                 req.partition(),
                 req.nearFutureId(),
+                nodeId,
                 req.dhtNodes(),
                 req.flags()) : null;
 
@@ -3349,19 +3315,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             dhtRes.nearEvicted(nearEvicted);
         }
 
-        final boolean RES_AFTER_ACK = false;
-
         if (nearRes != null) {
-            if (RES_AFTER_ACK)
+            if (IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK)
                 sendDhtNearResponse(nodeId, req, nearRes);
             else {
                 sendDhtNearResponse(null, req, nearRes);
 
-                sendDeferredUpdateResponse(nodeId, req.futureId());
+                sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
             }
         }
         else
-            sendDeferredUpdateResponse(nodeId, req.futureId());
+            sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
 
         if (dhtRes != null)
             sendDhtPrimaryResponse(nodeId, req, dhtRes);
@@ -3399,6 +3363,182 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     *
+     */
+    private class DeferredResponseClosure implements IgniteInClosure<IgniteException>, Runnable {
+        /** */
+        private final int part;
+
+        /** */
+        private final UUID primaryId;
+
+        /** */
+        private final long futId;
+
+        /**
+         * @param part Partition ID.
+         * @param primaryId Primary ID.
+         * @param futId Future ID.
+         */
+        DeferredResponseClosure(int part, UUID primaryId, long futId) {
+            this.part = part;
+            this.primaryId = primaryId;
+            this.futId = futId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            sendDeferredUpdateResponse(part, primaryId, futId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(IgniteException e) {
+            ctx.kernalContext().getStripedExecutorService().execute(part, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
+        /** */
+        private final int part;
+
+        /** */
+        private final UUID primaryId;
+
+        /** */
+        private final IgniteUuid id;
+
+        /** */
+        private final long endTime;
+
+        /**
+         * @param part Partition.
+         * @param primaryId Primary ID.
+         */
+        DeferredUpdateTimeout(int part, UUID primaryId) {
+            this.part = part;
+            this.primaryId = primaryId;
+
+            endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+
+            id = IgniteUuid.fromUuid(primaryId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+            GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+            if (msg.timeoutSender() == this) {
+                msg.timeoutSender(null);
+
+                resMap.remove(primaryId);
+
+                sendDeferredUpdateResponse(primaryId, msg);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            ctx.kernalContext().getStripedExecutorService().execute(part, this);
+        }
+    }
+
+    /**
+     * @param part Partition.
+     * @param primaryId Primary ID.
+     * @param futId Future ID.
+     */
+    private void sendDeferredUpdateResponse(int part, UUID primaryId, long futId) {
+        Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+        GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+        if (msg == null) {
+            msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+                new GridLongList(DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE));
+
+            if (DEFERRED_UPDATE_RESPONSE_TIMEOUT > 0) {
+                GridTimeoutObject timeoutSnd = new DeferredUpdateTimeout(part, primaryId);
+
+                msg.timeoutSender(timeoutSnd);
+
+                ctx.time().addTimeoutObject(timeoutSnd);
+            }
+
+            resMap.put(primaryId, msg);
+        }
+
+        GridLongList futIds = msg.futureIds();
+
+        assert futIds.size() < DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE : futIds.size();
+
+        futIds.add(futId);
+
+        if (futIds.size() == DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
+            resMap.remove(primaryId);
+
+            sendDeferredUpdateResponse(primaryId, msg);
+        }
+    }
+
+    /**
+     * @param primaryId Primary ID.
+     * @param msg Message.
+     */
+    private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) {
+        try {
+            ctx.kernalContext().gateway().readLock();
+
+            GridTimeoutObject timeoutSnd = msg.timeoutSender();
+
+            if (timeoutSnd != null)
+                ctx.time().removeTimeoutObject(timeoutSnd);
+
+            try {
+                ctx.io().send(primaryId, msg, ctx.ioPolicy());
+
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() +
+                        ", node=" + primaryId + ']');
+                }
+            }
+            finally {
+                ctx.kernalContext().gateway().readUnlock();
+            }
+        }
+        catch (IllegalStateException ignored) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
+                    "futIds=" + msg.futureIds() + ", node=" + primaryId + ']');
+            }
+        }
+        catch (ClusterTopologyCheckedException ignored) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Failed to send deferred DHT update response, node left [" +
+                    "futIds=" + msg.futureIds() + ", node=" + primaryId + ']');
+            }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send deferred DHT update response to remote node [" +
+                "futIds=" + msg.futureIds() + ", node=" + primaryId + ']', e);
+        }
+    }
+
+    /**
      * @param primaryId Primary node ID.
      * @param req Request.
      * @param nearRes Response to send.
@@ -3413,11 +3553,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 throw new ClusterTopologyCheckedException("Node left: " + req.nearNodeId());
 
             if (primaryId != null) {
-                ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy(), new IgniteInClosure<IgniteException>() {
-                    @Override public void apply(IgniteException e) {
-                        sendDeferredUpdateResponse(primaryId, req.futureId());
-                    }
-                });
+                ctx.gridIO().send(node,
+                    TOPIC_CACHE,
+                    nearRes,
+                    ctx.ioPolicy(),
+                    new DeferredResponseClosure(req.partition(), primaryId, req.futureId()));
             }
             else
                 ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy());
@@ -3443,14 +3583,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
-     * @param nodeId Node ID to send message to.
-     * @param futId ID to ack.
-     */
-    private void sendDeferredUpdateResponse(UUID nodeId, long futId) {
-        deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, futId);
-    }
-
-    /**
      * @param nodeId Node ID.
      * @param res Response.
      */
@@ -3489,7 +3621,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     ", node=" + nodeId + ']');
             }
 
-            updateFut.onResult(nodeId, res);
+            updateFut.onDhtResponse(nodeId, res);
         }
         else {
             U.warn(msgLog, "Failed to find update future DHT atomic near response [futId=" + res.futureId() +
@@ -3526,7 +3658,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
-        for (Long id : res.futureIds()) {
+        GridLongList futIds = res.futureIds();
+
+        assert futIds != null && futIds.size() > 0 : futIds;
+
+        for (int i = 0; i < futIds.size(); i++) {
+            Long id = futIds.get(i);
+
             GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(id);
 
             if (updateFut != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 671d516..4e9ee86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -19,17 +19,17 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Deferred dht atomic update response.
@@ -42,13 +42,11 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
     public static final int CACHE_MSG_IDX = nextIndexId();
 
     /** ACK future versions. */
-    @GridDirectCollection(Long.class)
-    private Collection<Long> futIds;
+    private GridLongList futIds;
 
-    /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
+    /** */
+    @GridDirectTransient
+    private GridTimeoutObject timeoutSnd;
 
     /**
      * Empty constructor required by {@link Externalizable}
@@ -62,25 +60,41 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
      *
      * @param cacheId Cache ID.
      * @param futIds Future IDs.
-     * @param addDepInfo Deployment info.
      */
-    public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<Long> futIds, boolean addDepInfo) {
-        assert !F.isEmpty(futIds);
-
+    public GridDhtAtomicDeferredUpdateResponse(int cacheId, GridLongList futIds) {
         this.cacheId = cacheId;
         this.futIds = futIds;
-        this.addDepInfo = addDepInfo;
+        this.timeoutSnd = timeoutSnd;
+    }
+
+    /**
+     * @param timeoutSnd Callback sending response on timeout.
+     */
+    void timeoutSender(@Nullable GridTimeoutObject timeoutSnd) {
+        this.timeoutSnd = timeoutSnd;
+    }
+
+    /**
+     * @return Callback sending response on timeout.
+     */
+    @Nullable GridTimeoutObject timeoutSender() {
+        return timeoutSnd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
     }
 
     /** {@inheritDoc} */
     @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
+        return false;
     }
 
     /**
      * @return List of ACKed future ids.
      */
-    public Collection<Long> futureIds() {
+    GridLongList futureIds() {
         return futIds;
     }
 
@@ -105,7 +119,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeCollection("futIds", futIds, MessageCollectionItemType.LONG))
+                if (!writer.writeMessage("futIds", futIds))
                     return false;
 
                 writer.incrementState();
@@ -127,7 +141,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
 
         switch (reader.state()) {
             case 3:
-                futIds = reader.readCollection("futIds", MessageCollectionItemType.LONG);
+                futIds = reader.readMessage("futIds");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index c88e6f2..244c22c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -31,7 +31,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
-import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_RESULT_SUCCESS_MASK;
 
 /**
  * TODO IGNITE-4705: no not send mapping if it == affinity?
@@ -50,6 +51,9 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
     private long futId;
 
     /** */
+    private UUID primaryId;
+
+    /** */
     @GridDirectCollection(UUID.class)
     private List<UUID> mapping;
 
@@ -68,18 +72,28 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
     /**
      * @param cacheId Cache ID.
+     * @param partId Partition.
      * @param futId Future ID.
+     * @param primaryId Primary node ID.
      * @param mapping Update mapping.
      * @param flags Flags.
      */
-    public GridDhtAtomicNearResponse(int cacheId, int partId, long futId, List<UUID> mapping, byte flags) {
+    public GridDhtAtomicNearResponse(int cacheId, int partId, long futId, UUID primaryId, List<UUID> mapping, byte flags) {
         this.cacheId = cacheId;
         this.partId = partId;
         this.futId = futId;
+        this.primaryId = primaryId;
         this.mapping = mapping;
         this.flags = flags;
     }
 
+    /**
+     * @return Primary node ID.
+     */
+    public UUID primaryId() {
+        return primaryId;
+    }
+
     /** {@inheritDoc} */
     @Override public int partition() {
         return partId;
@@ -156,7 +170,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 9;
     }
 
     /** {@inheritDoc} */
@@ -225,6 +239,12 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 writer.incrementState();
 
+            case 8:
+                if (!writer.writeUuid("primaryId", primaryId))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -281,6 +301,14 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
+            case 8:
+                primaryId = reader.readUuid("primaryId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtAtomicNearResponse.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 15075c3..66d392b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.expiry.ExpiryPolicy;
@@ -32,8 +36,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -124,9 +128,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /** Future ID. */
     protected Long futId;
 
-    /** Completion future for a particular topology version. */
-    protected GridFutureAdapter<Void> topCompleteFut;
-
     /** Operation result. */
     protected GridCacheReturn opRes;
 
@@ -245,7 +246,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
      */
     protected boolean storeFuture() {
-        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+        return syncMode != FULL_ASYNC;
     }
 
     /**
@@ -259,7 +260,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             cache.updateAllAsyncInternal(nodeId, req,
                 new GridDhtAtomicCache.UpdateReplyClosure() {
                     @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-                        onResult(res.nodeId(), res, false);
+                        onPrimaryResponse(res.nodeId(), res, false);
                     }
                 });
         }
@@ -296,10 +297,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param res Update response.
      * @param nodeErr {@code True} if response was created on node failure.
      */
-    public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
+    public abstract void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
 
-    public abstract void onResult(UUID nodeId, GridDhtAtomicNearResponse res);
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res);
 
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
     public abstract void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res);
 
     /**
@@ -315,7 +324,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
             res.addFailedKeys(req.keys(), e);
 
-            onResult(req.nodeId(), res, true);
+            onPrimaryResponse(req.nodeId(), res, true);
         }
     }
 
@@ -342,4 +351,150 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
         return futId;
     }
+
+    /**
+     *
+     */
+    static class PrimaryRequestState {
+        /** */
+        final GridNearAtomicAbstractUpdateRequest req;
+
+        /** */
+        private Set<UUID> rcvd;
+
+        /** */
+        private Set<UUID> mapping;
+
+        /** */
+        private boolean hasRes;
+
+        /**
+         * @param req Request.
+         */
+        PrimaryRequestState(GridNearAtomicAbstractUpdateRequest req) {
+            assert req != null && req.nodeId() != null : req;
+
+            this.req = req;
+        }
+
+        /**
+         * @param cctx Context.
+         * @param nodeIds DHT nodes.
+         */
+        void initMapping(GridCacheContext cctx, List<UUID> nodeIds) {
+            mapping = U.newHashSet(nodeIds.size());
+
+            for (UUID dhtNodeId : nodeIds) {
+                if (cctx.discovery().node(dhtNodeId) != null)
+                    mapping.add(dhtNodeId);
+            }
+
+            if (rcvd != null)
+                mapping.removeAll(rcvd);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @return Request if need process primary response, {@code null} otherwise.
+         */
+        @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId) {
+            if (req != null && req.nodeId().equals(nodeId) && req.response() == null)
+                return req;
+
+            return null;
+        }
+
+        /**
+         * @param cctx Context.
+         * @param res Response.
+         * @return {@code True} if request processing finished.
+         */
+        boolean onMappingReceived(GridCacheContext cctx, GridNearAtomicMappingResponse res) {
+            if (mapping == null) {
+                initMapping(cctx, res.mapping());
+
+                if (mapping.isEmpty() && hasRes)
+                    return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @return {@code True} if request processing finished.
+         */
+        boolean onNodeLeft(UUID nodeId) {
+            if (mapping != null && mapping.remove(nodeId)) {
+                if (mapping.isEmpty() && hasRes)
+                    return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * TODO 4705: check response for errors.
+         *
+         * @param cctx Context.
+         * @param nodeId Node ID.
+         * @param res Response.
+         * @return {@code True} if request processing finished.
+         */
+        boolean onDhtResponse(GridCacheContext cctx, UUID nodeId, GridDhtAtomicNearResponse res) {
+            if (res.mapping() != null) {
+                // Mapping is sent from dht nodes.
+                if (mapping == null)
+                    initMapping(cctx, res.mapping());
+            }
+            else {
+                // Mapping and result are sent from primary.
+                if (mapping == null) {
+                    if (rcvd == null)
+                        rcvd = new HashSet<>();
+
+                    rcvd.add(nodeId);
+
+                    return false; // Need wait for response from primary.
+                }
+            }
+
+            mapping.remove(nodeId);
+
+            if (res.hasResult())
+                hasRes = true;
+
+            return mapping.isEmpty() && hasRes;
+        }
+
+        /**
+         * @param cctx Context.
+         * @param res Response.
+         * @return {@code True} if request processing finished.
+         */
+        boolean onPrimaryResponse(GridCacheContext cctx, GridNearAtomicUpdateResponse res) {
+            hasRes = true;
+
+            boolean onRes = req.onResponse(res);
+
+            assert onRes;
+
+            if (res.error() != null || res.remapKeys() != null)
+                return true;
+
+            assert res.returnValue() != null : res;
+
+            if (res.mapping() != null)
+                initMapping(cctx, res.mapping());
+            else
+                mapping = Collections.emptySet();
+
+            return mapping.isEmpty();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PrimaryRequestState.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 077c73c..d5c7a9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -65,11 +65,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
 
     /**
      * @return Flag indicating whether this is fast-map udpate.
+     * TODO IGNITE-4705
      */
     public abstract boolean fastMap();
 
     /**
      * @return Update version for fast-map request.
+     * TODO IGNITE-4705
      */
     public abstract GridCacheVersion updateVersion();
 
@@ -125,6 +127,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
 
     /**
      * @return Flag indicating whether this request contains primary keys.
+     * TODO IGNITE-4705
      */
     public abstract boolean hasPrimary();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index bc74229..df74d3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -20,10 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
@@ -45,13 +42,11 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -67,14 +62,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Object val;
 
-    /** Current request. */
-    private GridNearAtomicAbstractUpdateRequest req;
-
-    /** */
-    private Set<UUID> rcvd;
-
     /** */
-    private Set<UUID> mapping;
+    private PrimaryRequestState reqState;
 
     /**
      * @param cctx Cache context.
@@ -138,9 +127,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridCacheReturn opRes0 = null;
 
         synchronized (mux) {
-            req = this.req != null && this.req.nodeId().equals(nodeId) ? this.req : null;
+            if (reqState == null)
+                return false;
 
-            if (req != null && req.response() == null) {
+            req = reqState.processPrimaryResponse(nodeId);
+
+            if (req != null) {
                 res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                     nodeId,
                     req.futureId(),
@@ -154,9 +146,10 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 res.addFailedKeys(req.keys(), e);
             }
             else {
-                if (mapping != null && mapping.remove(nodeId)) {
-                    if (mapping.isEmpty() && opRes != null)
-                        opRes0 = opRes;
+                if (reqState.onNodeLeft(nodeId)) {
+                    opRes0 = opRes;
+
+                    assert opRes0 != null;
                 }
             }
         }
@@ -168,7 +161,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     ", node=" + nodeId + ']');
             }
 
-            onResult(nodeId, res, true);
+            onPrimaryResponse(nodeId, res, true);
         }
         else if (opRes0 != null)
             onDone(opRes0);
@@ -207,21 +200,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         return false;
     }
 
-    /**
-     * @param nodeIds DHT nodes.
-     */
-    private void initMapping(List<UUID> nodeIds) {
-        mapping = U.newHashSet(nodeIds.size());
-
-        for (UUID dhtNodeId : nodeIds) {
-            if (cctx.discovery().node(dhtNodeId) != null)
-                mapping.add(dhtNodeId);
-        }
-
-        if (rcvd != null)
-            mapping.removeAll(rcvd);
-    }
-
     /** {@inheritDoc} */
     @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
         GridCacheReturn opRes0 = null;
@@ -230,11 +208,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (futId == null || futId != res.futureId())
                 return;
 
-            if (mapping == null) {
-                initMapping(res.mapping());
+            assert reqState != null;
 
-                if (mapping.isEmpty() && opRes != null)
-                    opRes0 = opRes;
+            if (reqState.onMappingReceived(cctx, res)) {
+                opRes0 = opRes;
+
+                assert opRes0 != null;
             }
         }
 
@@ -243,39 +222,25 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) {
+    @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
         GridCacheReturn opRes0 = null;
 
         synchronized (mux) {
             if (futId == null || futId != res.futureId())
                 return;
 
-            if (res.mapping() != null) {
-                // Mapping is sent from dht nodes.
-                if (mapping == null)
-                    initMapping(res.mapping());
-            }
-            else {
-                // Mapping and result are sent from primary.
-                if (mapping == null) {
-                    if (rcvd == null)
-                        rcvd = new HashSet<>();
+            assert reqState != null;
 
-                    rcvd.add(nodeId);
-
-                    return; // Need wait for response from primary.
-                }
-                else
-                    mapping.remove(nodeId);
-            }
-
-            mapping.remove(nodeId);
+            assert reqState.req.nodeId().equals(res.primaryId());
 
             if (opRes == null && res.hasResult())
                 opRes = res.result();
 
-            if (mapping.isEmpty() && opRes != null)
+            if (reqState.onDhtResponse(cctx, nodeId, res)) {
                 opRes0 = opRes;
+
+                assert opRes0 != null;
+            }
         }
 
         if (opRes0 != null)
@@ -284,7 +249,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+    @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
         GridNearAtomicAbstractUpdateRequest req;
 
         AffinityTopologyVersion remapTopVer = null;
@@ -292,18 +257,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
 
-        GridFutureAdapter<?> fut0 = null;
-
         synchronized (mux) {
             if (futId == null || futId != res.futureId())
                 return;
 
-            if (!this.req.nodeId().equals(nodeId))
-                return;
-
-            req = this.req;
+            req = reqState.processPrimaryResponse(nodeId);
 
-            this.req = null;
+            if (req == null)
+                return;
 
             boolean remapKey = !F.isEmpty(res.remapKeys());
 
@@ -344,12 +305,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 else
                     opRes = ret;
 
-                if (res.mapping() != null)
-                    initMapping(res.mapping());
-                else
-                    mapping = Collections.emptySet();
+                assert reqState != null;
 
-                if (!mapping.isEmpty())
+                if (!reqState.onPrimaryResponse(cctx, res))
                     return;
             }
 
@@ -364,8 +322,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                     X.hasCause(err, ClusterTopologyCheckedException.class) &&
                     storeFuture() &&
                     --remapCnt > 0) {
-                    ClusterTopologyCheckedException topErr =
-                        X.cause(err, ClusterTopologyCheckedException.class);
+                    ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
 
                     if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
                         CachePartialUpdateCheckedException cause =
@@ -386,12 +343,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 opRes0 = opRes;
             }
             else {
-                fut0 = topCompleteFut;
-
-                topCompleteFut = null;
-
                 cctx.mvcc().removeAtomicFuture(futId);
 
+                reqState = null;
                 futId = null;
                 topVer = AffinityTopologyVersion.ZERO;
             }
@@ -407,9 +361,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             updateNear(req, res);
 
         if (remapTopVer != null) {
-            if (fut0 != null)
-                fut0.onDone();
-
             if (!waitTopFut) {
                 onDone(new GridCacheTryPutFailedException());
 
@@ -544,7 +495,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 resCnt = 0;
 
-                req = singleReq0;
+                reqState = new PrimaryRequestState(singleReq0);
             }
         }
         catch (Exception e) {
@@ -567,21 +518,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private Long onFutureDone() {
         Long id0;
 
-        GridFutureAdapter<Void> fut0;
-
         synchronized (mux) {
-            fut0 = topCompleteFut;
-
-            topCompleteFut = null;
-
             id0 = futId;
 
             futId = null;
         }
 
-        if (fut0 != null)
-            fut0.onDone();
-
         return id0;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 4167fab..85cb0e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
@@ -36,7 +35,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -47,10 +45,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -58,18 +54,13 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
  * DHT atomic cache near update future.
  */
 public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
-    /** Fast map flag. */
-    private final boolean fastMap;
-
     /** Keys */
     private Collection<?> keys;
 
@@ -87,13 +78,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /** Mappings if operations is mapped to more than one node. */
     @GridToStringInclude
-    private Map<UUID, GridNearAtomicFullUpdateRequest> mappings;
+    private Map<UUID, PrimaryRequestState> mappings;
 
     /** Keys to remap. */
     private Collection<KeyCacheObject> remapKeys;
 
     /** Not null is operation is mapped to single node. */
-    private GridNearAtomicFullUpdateRequest singleReq;
+    private PrimaryRequestState singleReq;
 
     /**
      * @param cctx Cache context.
@@ -149,8 +140,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         this.vals = vals;
         this.conflictPutVals = conflictPutVals;
         this.conflictRmvVals = conflictRmvVals;
-
-        fastMap = cache.isFastMap(filter, op);
     }
 
     /** {@inheritDoc} */
@@ -164,15 +153,20 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override public boolean onNodeLeft(UUID nodeId) {
         GridNearAtomicUpdateResponse res = null;
 
-        GridNearAtomicFullUpdateRequest req;
+        GridNearAtomicAbstractUpdateRequest req;
 
         synchronized (mux) {
             if (singleReq != null)
-                req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
-            else
-                req = mappings != null ? mappings.get(nodeId) : null;
+                req = singleReq.processPrimaryResponse(nodeId);
+            else {
+                PrimaryRequestState reqState = mappings != null ? mappings.get(nodeId) : null;
+
+                req = reqState != null ? reqState.processPrimaryResponse(nodeId) : null;
+            }
+
+            if (req != null) {
+                assert req.response() == null : req;
 
-            if (req != null && req.response() == null) {
                 res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                     nodeId,
                     req.futureId(),
@@ -194,7 +188,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     ", node=" + nodeId + ']');
             }
 
-            onResult(nodeId, res, true);
+            onPrimaryResponse(nodeId, res, true);
         }
 
         return false;
@@ -202,30 +196,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        // Wait fast-map near atomic update futures in CLOCK mode.
-        if (fastMap) {
-            GridFutureAdapter<Void> fut;
-
-            synchronized (mux) {
-                if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) {
-                    if (topCompleteFut == null)
-                        topCompleteFut = new GridFutureAdapter<>();
-
-                    fut = topCompleteFut;
-                }
-                else
-                    fut = null;
-            }
-
-            if (fut != null && isDone()) {
-                fut.onDone();
-
-                return null;
-            }
-
-            return fut;
-        }
-
         return null;
     }
 
@@ -257,18 +227,94 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /** {@inheritDoc} */
     @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
-        throw new UnsupportedOperationException();
+        GridCacheReturn opRes0 = null;
+
+        synchronized (mux) {
+            if (futId == null || futId != res.futureId())
+                return;
+
+            PrimaryRequestState reqState;
+
+            if (singleReq != null) {
+                if (singleReq.onMappingReceived(cctx, res)) {
+                    opRes0 = opRes;
+
+                    assert opRes0 != null;
+                }
+            }
+            else {
+                reqState = mappings != null ? mappings.get(nodeId) : null;
+
+                if (reqState != null && reqState.onMappingReceived(cctx, res)) {
+                    assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                    resCnt++;
+
+                    if (mappings.size() == resCnt) {
+                        opRes0 = opRes;
+
+                        assert opRes0 != null;
+                    }
+                }
+            }
+        }
+
+        if (opRes0 != null)
+            onDone(opRes0);
     }
 
     /** {@inheritDoc} */
-    @Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) {
-        throw new UnsupportedOperationException();
+    @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+        GridCacheReturn opRes0 = null;
+
+        synchronized (mux) {
+            if (futId == null || futId != res.futureId())
+                return;
+
+            PrimaryRequestState reqState;
+
+            if (singleReq != null) {
+                assert singleReq.req.nodeId().equals(res.primaryId());
+
+                if (opRes == null && res.hasResult())
+                    opRes = res.result();
+
+                if (singleReq.onDhtResponse(cctx, nodeId, res)) {
+                    opRes0 = opRes;
+
+                    assert opRes0 != null;
+                }
+            }
+            else {
+                reqState = mappings != null ? mappings.get(res.primaryId()) : null;
+
+                if (reqState != null) {
+                    if (opRes == null && res.hasResult())
+                        opRes = res.result();
+
+                    if (reqState.onDhtResponse(cctx, nodeId, res)) {
+                        assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                        resCnt++;
+
+                        if (mappings.size() == resCnt) {
+                            opRes0 = opRes;
+
+                            assert opRes0 != null;
+                        }
+                    }
+                }
+            }
+        }
+
+        if (opRes0 != null)
+            onDone(opRes0);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
-        GridNearAtomicFullUpdateRequest req;
+    @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+        GridNearAtomicAbstractUpdateRequest req;
 
         AffinityTopologyVersion remapTopVer = null;
 
@@ -277,29 +323,39 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         boolean rcvAll;
 
-        GridFutureAdapter<?> fut0 = null;
-
         synchronized (mux) {
             if (futId == null || futId != res.futureId())
                 return;
 
             if (singleReq != null) {
-                if (!singleReq.nodeId().equals(nodeId))
-                    return;
-
-                req = singleReq;
+                req = singleReq.processPrimaryResponse(nodeId);
 
-                singleReq = null;
+                if (req == null)
+                    return;
 
-                rcvAll = true;
+                rcvAll = singleReq.onPrimaryResponse(cctx, res);
             }
             else {
-                req = mappings != null ? mappings.get(nodeId) : null;
+                PrimaryRequestState reqState = mappings != null ? mappings.get(nodeId) : null;
 
-                if (req != null && req.onResponse(res)) {
-                    resCnt++;
+                if (reqState == null)
+                    return;
+
+                req = reqState.processPrimaryResponse(nodeId);
+
+                if (req != null) {
+                    if (reqState.onPrimaryResponse(cctx, res)) {
+                        assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+                        resCnt++;
+
+                        rcvAll = mappings.size() == resCnt;
+                    }
+                    else {
+                        assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
 
-                    rcvAll = mappings.size() == resCnt;
+                        rcvAll = false;
+                    }
                 }
                 else
                     return;
@@ -308,8 +364,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             assert req != null && req.topologyVersion().equals(topVer) : req;
 
             if (res.remapKeys() != null) {
-                assert !fastMap || cctx.kernalContext().clientNode();
-
                 if (remapKeys == null)
                     remapKeys = U.newHashSet(res.remapKeys().size());
 
@@ -333,24 +387,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 }
             }
             else {
-                if (!req.fastMap() || req.hasPrimary()) {
-                    GridCacheReturn ret = res.returnValue();
-
-                    if (op == TRANSFORM) {
-                        if (ret != null) {
-                            assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-                            if (ret.value() != null) {
-                                if (opRes != null)
-                                    opRes.mergeEntryProcessResults(ret);
-                                else
-                                    opRes = ret;
-                            }
+                GridCacheReturn ret = res.returnValue();
+
+                if (op == TRANSFORM) {
+                    if (ret != null) {
+                        assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+                        if (ret.value() != null) {
+                            if (opRes != null)
+                                opRes.mergeEntryProcessResults(ret);
+                            else
+                                opRes = ret;
                         }
                     }
-                    else
-                        opRes = ret;
                 }
+                else
+                    opRes = ret;
             }
 
             if (rcvAll) {
@@ -365,8 +417,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                         X.hasCause(err, ClusterTopologyCheckedException.class) &&
                         storeFuture() &&
                         --remapCnt > 0) {
-                        ClusterTopologyCheckedException topErr =
-                            X.cause(err, ClusterTopologyCheckedException.class);
+                        ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
 
                         if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
                             CachePartialUpdateCheckedException cause =
@@ -394,10 +445,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     opRes0 = opRes;
                 }
                 else {
-                    fut0 = topCompleteFut;
-
-                    topCompleteFut = null;
-
                     cctx.mvcc().removeAtomicFuture(futId);
 
                     futId = null;
@@ -414,12 +461,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
-                for (GridNearAtomicFullUpdateRequest req0 : mappings.values()) {
-                    GridNearAtomicUpdateResponse res0 = req0.response();
+                for (PrimaryRequestState reqState : mappings.values()) {
+                    GridNearAtomicUpdateResponse res0 = reqState.req.response();
 
-                    assert res0 != null : req0;
+                    assert res0 != null : reqState;
 
-                    updateNear(req0, res0);
+                    updateNear(reqState.req, res0);
                 }
             }
             else if (!nodeErr)
@@ -427,9 +474,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         }
 
         if (remapTopVer != null) {
-            if (fut0 != null)
-                fut0.onDone();
-
             if (!waitTopFut) {
                 onDone(new GridCacheTryPutFailedException());
 
@@ -483,10 +527,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @param req Update request.
      * @param res Update response.
      */
-    private void updateNear(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
+    private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
-        if (res.remapKeys() != null || !req.hasPrimary())
+        if (res.remapKeys() != null)
             return;
 
         GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -558,13 +602,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      *
      * @param mappings Mappings to send.
      */
-    private void doUpdate(Map<UUID, GridNearAtomicFullUpdateRequest> mappings) {
+    private void doUpdate(Map<UUID, PrimaryRequestState> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
-        GridNearAtomicFullUpdateRequest locUpdate = null;
+        GridNearAtomicAbstractUpdateRequest locUpdate = null;
 
         // Send messages to remote nodes first, then run local update.
-        for (GridNearAtomicFullUpdateRequest req : mappings.values()) {
+        for (PrimaryRequestState reqState : mappings.values()) {
+            GridNearAtomicAbstractUpdateRequest req = reqState.req;
+
             if (locNodeId.equals(req.nodeId())) {
                 assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
                     ", req=" + req + ']';
@@ -598,7 +644,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
                 new GridDhtAtomicCache.UpdateReplyClosure() {
                     @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-                        onResult(res.nodeId(), res, false);
+                        onPrimaryResponse(res.nodeId(), res, false);
                     }
                 });
         }
@@ -620,12 +666,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     void map(AffinityTopologyVersion topVer,
         Long futId,
         @Nullable Collection<KeyCacheObject> remapKeys) {
-        if (true) {
-            onDone(new IgniteCheckedException("Failed"));
-
-            return;
-        }
-
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -635,37 +675,20 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        GridCacheVersion updVer;
-
-        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
-// TODO IGNITE-4705 (get rid of updVer everywhere).
-//            updVer = this.updVer;
-//
-//            if (updVer == null) {
-//                updVer = cctx.versions().next(topVer);
-//
-//                if (log.isDebugEnabled())
-//                    log.debug("Assigned fast-map version for update on near node: " + updVer);
-//            }
-        }
-        else
-            updVer = null;
-
         Exception err = null;
-        GridNearAtomicFullUpdateRequest singleReq0 = null;
-        Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null;
+        PrimaryRequestState singleReq0 = null;
+        Map<UUID, PrimaryRequestState> mappings0 = null;
 
         int size = keys.size();
 
         try {
-            if (size == 1 && !fastMap) {
+            if (size == 1) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
                 singleReq0 = mapSingleUpdate(topVer, futId, null);
             }
             else {
-                Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes,
+                Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
                     topVer,
                     futId,
                     null,
@@ -674,16 +697,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 if (pendingMappings.size() == 1)
                     singleReq0 = F.firstValue(pendingMappings);
                 else {
-                    if (syncMode == PRIMARY_SYNC) {
-                        mappings0 = U.newHashMap(pendingMappings.size());
-
-                        for (GridNearAtomicFullUpdateRequest req : pendingMappings.values()) {
-                            if (req.hasPrimary())
-                                mappings0.put(req.nodeId(), req);
-                        }
-                    }
-                    else
-                        mappings0 = pendingMappings;
+                    mappings0 = pendingMappings;
 
                     assert !mappings0.isEmpty() || size == 0 : this;
                 }
@@ -715,7 +729,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         // Optimize mapping for single key.
         if (singleReq0 != null)
-            mapSingle(singleReq0.nodeId(), singleReq0);
+            mapSingle(singleReq0.req.nodeId(), singleReq0.req);
         else {
             assert mappings0 != null;
 
@@ -732,21 +746,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     private Long onFutureDone() {
         Long id0;
 
-        GridFutureAdapter<Void> fut0;
-
         synchronized (mux) {
-            fut0 = topCompleteFut;
-
-            topCompleteFut = null;
-
             id0 = futId;
 
             futId = null;
         }
 
-        if (fut0 != null)
-            fut0.onDone();
-
         return id0;
     }
 
@@ -760,7 +765,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @throws Exception If failed.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+    private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
         Long futId,
         @Nullable GridCacheVersion updVer,
@@ -780,7 +785,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (conflictRmvVals != null)
             conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+        Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
 
         // Create mappings first, then send messages.
         for (Object key : keys) {
@@ -835,55 +840,43 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             else
                 val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
 
-            List<ClusterNode> affNodes = mapKey(cacheKey, topVer);
+            ClusterNode primary = cctx.affinity().primaryByKey(cacheKey, topVer);
 
-            if (affNodes.isEmpty())
+            if (primary == null)
                 throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                     "(all partition nodes left the grid).");
 
-            int i = 0;
-
-            for (int n = 0; n < affNodes.size(); n++) {
-                ClusterNode affNode = affNodes.get(n);
-
-                if (affNode == null)
-                    throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                        "(all partition nodes left the grid).");
-
-                UUID nodeId = affNode.id();
-
-                GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId);
-
-                if (mapped == null) {
-                    mapped = new GridNearAtomicFullUpdateRequest(
-                        cctx.cacheId(),
-                        nodeId,
-                        futId,
-                        fastMap,
-                        updVer,
-                        topVer,
-                        topLocked,
-                        syncMode,
-                        op,
-                        retval,
-                        expiryPlc,
-                        invokeArgs,
-                        filter,
-                        subjId,
-                        taskNameHash,
-                        skipStore,
-                        keepBinary,
-                        cctx.kernalContext().clientNode(),
-                        cctx.deploymentEnabled(),
-                        keys.size());
-
-                    pendingMappings.put(nodeId, mapped);
-                }
+            UUID nodeId = primary.id();
 
-                mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+            PrimaryRequestState mapped = pendingMappings.get(nodeId);
 
-                i++;
+            if (mapped == null) {
+                mapped = new PrimaryRequestState(new GridNearAtomicFullUpdateRequest(
+                    cctx.cacheId(),
+                    nodeId,
+                    futId,
+                    false,
+                    updVer,
+                    topVer,
+                    topLocked,
+                    syncMode,
+                    op,
+                    retval,
+                    expiryPlc,
+                    invokeArgs,
+                    filter,
+                    subjId,
+                    taskNameHash,
+                    skipStore,
+                    keepBinary,
+                    cctx.kernalContext().clientNode(),
+                    cctx.deploymentEnabled(),
+                    keys.size()));
+
+                pendingMappings.put(nodeId, mapped);
             }
+
+            mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, true);
         }
 
         return pendingMappings;
@@ -896,7 +889,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer,
         Long futId,
         @Nullable GridCacheVersion updVer) throws Exception {
         Object key = F.first(keys);
@@ -961,7 +954,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             cctx.cacheId(),
             primary.id(),
             futId,
-            fastMap,
+            false,
             updVer,
             topVer,
             topLocked,
@@ -986,23 +979,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             conflictVer,
             true);
 
-        return req;
-    }
-
-    /**
-     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
-     * node and send updates in parallel to all participating nodes.
-     *
-     * @param key Key to map.
-     * @param topVer Topology version to map.
-     * @return Collection of nodes to which key is mapped.
-     */
-    private List<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) {
-        GridCacheAffinityManager affMgr = cctx.affinity();
-
-        // If we can send updates in parallel - do it.
-        return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primaryByKey(key, topVer));
+        return new PrimaryRequestState(req);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index d108b56..a4a7838 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -24,6 +24,7 @@ import java.nio.ByteOrder;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListener;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
@@ -201,7 +203,7 @@ public class IpcToNioAdapter<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut, IgniteInClosure<IgniteException> ackC) {
             assert ses == IpcToNioAdapter.this.ses;
 
             return send((Message)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
index 7987d3d..f110cf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.util.nio;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * Verifies that first bytes received in accepted (incoming)
@@ -73,9 +75,10 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg, fut);
+        return proceedSessionWrite(ses, msg, fut, ackC);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
index 40c87cb..56c9772 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.util.nio;
 
 import java.util.concurrent.Executor;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.util.worker.GridWorkerPool;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * Enables multithreaded notification of session opened, message received and session closed events.
@@ -110,9 +112,10 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg, fut);
+        return proceedSessionWrite(ses, msg, fut, ackC);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
index 343e625..b81086a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.util.nio;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * Filter that transforms byte buffers to user-defined objects and vice-versa
@@ -82,16 +84,17 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         // No encoding needed in direct mode.
         if (directMode)
-            return proceedSessionWrite(ses, msg, fut);
+            return proceedSessionWrite(ses, msg, fut, ackC);
 
         try {
             ByteBuffer res = parser.encode(ses, msg);
 
-            return proceedSessionWrite(ses, res, fut);
+            return proceedSessionWrite(ses, res, fut, ackC);
         }
         catch (IOException e) {
             throw new GridNioException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba085859/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
index be77d39..eab4909 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
@@ -31,6 +31,13 @@ public class GridNioEmbeddedFuture<R> extends GridNioFutureImpl<R> {
     private static final long serialVersionUID = 0L;
 
     /**
+     *
+     */
+    public GridNioEmbeddedFuture() {
+        super(null);
+    }
+
+    /**
      * Callback to notify that future is finished.
      * This method must delegate to {@link #onDone(GridNioFuture, Throwable)} method.
      *


Mime
View raw message