ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/3] ignite git commit: ignite-4705
Date Thu, 16 Feb 2017 15:45:50 GMT
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c5c5eb5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c5c5eb5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c5c5eb5a

Branch: refs/heads/ignite-4705
Commit: c5c5eb5ade63a64ccf8c193db77978c4e297fc19
Parents: 7287a93
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Feb 16 15:07:54 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Feb 16 15:23:12 2017 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |  30 +-
 .../communication/GridIoMessageFactory.java     |  10 +-
 .../processors/cache/GridCacheMessage.java      |   4 +-
 .../cache/GridDeferredAckMessageSender.java     |  11 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  49 ++-
 .../GridDhtAtomicAbstractUpdateRequest.java     | 135 +++++++-
 .../dht/atomic/GridDhtAtomicCache.java          | 320 ++++++++++++-------
 .../dht/atomic/GridDhtAtomicNearResponse.java   | 268 ++++++++++++++++
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  24 +-
 .../GridDhtAtomicSingleUpdateRequest.java       |  56 ++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   8 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  98 +++---
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |   7 +
 .../GridNearAtomicAbstractUpdateFuture.java     |   3 +-
 .../dht/atomic/GridNearAtomicDhtResponse.java   | 222 -------------
 .../GridNearAtomicSingleUpdateFuture.java       |  61 +++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   2 +-
 .../atomic/GridNearAtomicUpdateResponse.java    |  62 +++-
 .../distributed/dht/atomic/UpdateErrors.java    | 187 +++++++++++
 .../distributed/near/GridNearAtomicCache.java   |  17 +-
 .../cache/IgniteGetAndPutBenchmark.java         |   2 +-
 .../cache/IgniteGetAndPutTxBenchmark.java       |   2 +-
 22 files changed, 1081 insertions(+), 497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 16b1e01..6636bf2 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -46,17 +46,18 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -181,19 +182,20 @@ public class MessageCodeGenerator {
 
 //        gen.generateAll(true);
 
-//        gen.generateAndWrite(GridNearAtomicDhtResponse.class);
-//        gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
-//        gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
-//        gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
-//        gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
-//        gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
-//        gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
-//        gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
-//        gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
-//        gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
-//        gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
-//        gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
-//        gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
+        gen.generateAndWrite(UpdateErrors.class);
+        gen.generateAndWrite(GridDhtAtomicNearResponse.class);
+        //gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
+        gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
+        gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
+        gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
+        gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
+        gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
+        gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
+        gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
+        gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
+        gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
+        gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
+        gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
 
 //        gen.generateAndWrite(GridMessageCollection.class);
 //        gen.generateAndWrite(DataStreamerEntry.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 769a615..5ed46ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -67,15 +67,16 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -174,8 +175,13 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -46:
+                msg = new UpdateErrors();
+
+                break;
+
             case -45:
-                msg = new GridNearAtomicDhtResponse();
+                msg = new GridDhtAtomicNearResponse();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 0646d5a..b9fb56a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -501,7 +501,7 @@ public abstract class GridCacheMessage implements Message {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+    public final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
         GridCacheContext ctx) throws IgniteCheckedException {
         if (col == null)
             return;
@@ -553,7 +553,7 @@ public abstract class GridCacheMessage implements Message {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+    public final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
         GridCacheContext ctx,
         ClassLoader ldr)
         throws IgniteCheckedException

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 8df883a..37ecc79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -21,7 +21,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -41,16 +40,16 @@ public abstract class GridDeferredAckMessageSender<T> {
     private GridTimeoutProcessor time;
 
     /** Closure processor. */
-    public GridClosureProcessor closure;
+    public GridClosureProcessor c;
 
     /**
      * @param time Time.
-     * @param closure Closure.
+     * @param c Closure.
      */
     public GridDeferredAckMessageSender(GridTimeoutProcessor time,
-        GridClosureProcessor closure) {
+        GridClosureProcessor c) {
         this.time = time;
-        this.closure = closure;
+        this.c = c;
     }
 
     /**
@@ -151,7 +150,7 @@ public abstract class GridDeferredAckMessageSender<T> {
         /** {@inheritDoc} */
         @Override public void onTimeout() {
             if (guard.compareAndSet(false, true)) {
-                closure.runLocalSafe(new Runnable() {
+                c.runLocalSafe(new Runnable() {
                     @Override public void run() {
                         writeLock().lock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 10d1c4b..d494d98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -93,9 +93,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** Continuous query closures. */
     private Collection<CI1<Boolean>> cntQryClsrs;
 
-    /** */
-    private final boolean waitForExchange;
-
     /** Response count. */
     private volatile int resCnt;
 
@@ -113,14 +110,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes) {
         this.cctx = cctx;
-
-        futId = cctx.mvcc().atomicFutureId();
         this.updateReq = updateReq;
         this.completionCb = completionCb;
         this.updateRes = updateRes;
         this.writeVer = writeVer;
 
-        waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
+        futId = cctx.mvcc().atomicFutureId();
 
         if (log == null) {
             msgLog = cctx.shared().atomicMessageLogger();
@@ -130,6 +125,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /** {@inheritDoc} */
     @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        boolean waitForExchange = !updateReq.topologyLocked();
+
         if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
             return this;
 
@@ -160,7 +157,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param updateCntr Partition update counter.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    final void addWriteEntry(GridDhtCacheEntry entry,
+    final void addWriteEntry(
+        UUID nearNodeId,
+        GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
@@ -190,7 +189,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
                 if (updateReq == null) {
                     updateReq = createRequest(
-                        node,
+                        node.id(),
+                        nearNodeId,
                         futId,
                         writeVer,
                         syncMode,
@@ -236,7 +236,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param ttl TTL for near cache update (optional).
      * @param expireTime Expire time for near cache update (optional).
      */
-    final void addNearWriteEntries(Collection<UUID> readers,
+    final void addNearWriteEntries(
+        UUID nearNodeId,
+        Collection<UUID> readers,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
@@ -259,7 +261,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     continue;
 
                 updateReq = createRequest(
-                    node,
+                    node.id(),
+                    nearNodeId,
                     futId,
                     writeVer,
                     syncMode,
@@ -352,9 +355,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * Sends requests to remote nodes.
      */
     final void map() {
+        boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
+
         if (!F.isEmpty(mappings)) {
+            List<UUID> dhtNodes = null;
+
+            if (fullSync) {
+                dhtNodes = new ArrayList<>(mappings.size());
+
+                dhtNodes.addAll(mappings.keySet());
+            }
+
             for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
                 try {
+                    req.dhtNodes(dhtNodes);
+
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                     if (msgLog.isDebugEnabled()) {
@@ -383,7 +398,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
         // Send response right away if no ACKs from backup is required.
         // Backups will send ACKs anyway, future will be completed after all backups have replied.
-        if (updateReq.writeSynchronizationMode() != FULL_SYNC)
+        if (!fullSync)
             completionCb.apply(updateReq, updateRes);
     }
 
@@ -400,7 +415,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /**
-     * @param node Node.
+     * @param nodeId Node ID.
+     * @param nodeId Near node ID.
      * @param futId Future ID.
      * @param writeVer Update version.
      * @param syncMode Write synchronization mode.
@@ -411,7 +427,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @return Request.
      */
     protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
-        ClusterNode node,
+        UUID nodeId,
+        UUID nearNodeId,
         long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
@@ -449,9 +466,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 for (CI1<Boolean> clsr : cntQryClsrs)
                     clsr.apply(suc);
             }
-
-            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
-                completionCb.apply(updateReq, updateRes);
+//
+//            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+//                completionCb.apply(updateReq, updateRes);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 7aa440d..3edbf8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -18,10 +18,13 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.UUID;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -29,6 +32,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -46,6 +52,16 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     @GridDirectTransient
     private boolean onRes;
 
+    /** */
+    private UUID nearNodeId;
+
+    /** */
+    private long nearFutId;
+
+    /** */
+    @GridDirectCollection(UUID.class)
+    private List<UUID> dhtNodes;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -58,10 +74,35 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
      *
      * @param cacheId Cache ID.
      * @param nodeId Node ID.
+     * @param nearNodeId Near node ID.
+     * @param nearFutId Future ID on near node.
      */
-    protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) {
+    protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId, UUID nearNodeId, long nearFutId) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
+        this.nearNodeId = nearNodeId;
+        this.nearFutId = nearFutId;
+    }
+
+    /**
+     * @return Near node ID.
+     */
+    public UUID nearNodeId() {
+        return nearNodeId;
+    }
+
+    /**
+     * @param dhtNodes DHT nodes.
+     */
+    public void dhtNodes(List<UUID> dhtNodes) {
+        this.dhtNodes = dhtNodes;
+    }
+
+    /**
+     * @return DHT nodes.
+     */
+    public List<UUID> dhtNodes() {
+        return dhtNodes;
     }
 
     /** {@inheritDoc} */
@@ -166,11 +207,18 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     public abstract int taskNameHash();
 
     /**
-     * @return Version assigned on primary node.
+     * @return Future ID on primary node.
      */
     public abstract long futureId();
 
     /**
+     * @return Future ID on near node.
+     */
+    public final long nearFutureId() {
+        return nearFutId;
+    }
+
+    /**
      * @return Write version.
      */
     public abstract GridCacheVersion writeVersion();
@@ -284,4 +332,87 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
      * @return Optional arguments for entry processor.
      */
     @Nullable public abstract Object[] invokeArguments();
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 6;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeLong("nearFutId", nearFutId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                nearFutId = reader.readLong("nearFutId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                nearNodeId = reader.readUuid("nearNodeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtAtomicAbstractUpdateRequest.class);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 87ac54b..3b81ee7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -55,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
@@ -93,14 +92,13 @@ import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -112,6 +110,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -211,17 +210,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-                if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
-                    assert req.writeSynchronizationMode() != FULL_ASYNC : req;
-
-                    // Always send reply in CLOCK ordering mode.
-                    sendNearUpdateReply(res.nodeId(), res);
-
-                    return;
-                }
-
-                // Request should be for primary keys only in PRIMARY ordering mode.
-                assert req.hasPrimary() : req;
+//                if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+//                    assert req.writeSynchronizationMode() != FULL_ASYNC : req;
+//
+//                    // Always send reply in CLOCK ordering mode.
+//                    sendNearUpdateReply(res.nodeId(), res);
+//
+//                    return;
+//                }
+//
+//                // Request should be for primary keys only in PRIMARY ordering mode.
+//                assert req.hasPrimary() : req;
 
                 if (req.writeSynchronizationMode() != FULL_ASYNC)
                     sendNearUpdateReply(res.nodeId(), res);
@@ -422,14 +421,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicDhtResponse.class, new CI2<UUID, GridNearAtomicDhtResponse>() {
-            @Override public void apply(UUID uuid, GridNearAtomicDhtResponse msg) {
-
+        ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() {
+            @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) {
+                processDhtAtomicNearResponse(uuid, msg);
             }
 
             @Override public String toString() {
-                return "GridDhtAtomicDeferredUpdateResponse handler " +
-                    "[msgIdx=" + GridNearAtomicDhtResponse.CACHE_MSG_IDX + ']';
+                return "GridDhtAtomicNearResponse handler " +
+                    "[msgIdx=" + GridDhtAtomicNearResponse.CACHE_MSG_IDX + ']';
             }
         });
 
@@ -1819,12 +1818,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         return;
                     }
 
-                    // Do not check topology version for CLOCK versioning since
-                    // partition exchange will wait for near update future (if future is on server node).
-                    // Also do not check topology version if topology was locked on near node by
+                    // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
-                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
                         ClusterNode node = ctx.discovery().node(nodeId);
 
                         if (node == null) {
@@ -1836,19 +1832,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
-                        GridCacheVersion ver = req.updateVersion();
-
-                        if (ver == null) {
-                            // Assign next version for update inside entries lock.
-                            ver = ctx.versions().next(top.topologyVersion());
+                        // Assign next version for update inside entries lock.
+                        GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
 
-                            if (hasNear)
-                                res.nearVersion(ver);
+                        if (hasNear)
+                            res.nearVersion(ver);
 
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("Assigned update version [futId=" + req.futureId() +
-                                    ", writeVer=" + ver + ']');
-                            }
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("Assigned update version [futId=" + req.futureId() +
+                                ", writeVer=" + ver + ']');
                         }
 
                         assert ver != null : "Got null version for update request: " + req;
@@ -2413,7 +2405,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * Updates locked entries one-by-one.
      *
-     * @param node Originating node.
+     * @param nearNode Originating node.
      * @param hasNear {@code True} if originating node has near cache.
      * @param req Update request.
      * @param res Update response.
@@ -2429,7 +2421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
     private UpdateSingleResult updateSingle(
-        ClusterNode node,
+        ClusterNode nearNode,
         boolean hasNear,
         GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res,
@@ -2473,9 +2465,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
 
-                boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
-                    req.topologyVersion());
-
                 Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
 
                 Collection<UUID> readers = null;
@@ -2483,38 +2472,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 if (checkReaders) {
                     readers = entry.readers();
-                    filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+                    filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
                 }
 
                 GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                     ver,
-                    node.id(),
+                    nearNode.id(),
                     locNodeId,
                     op,
                     writeVal,
                     req.invokeArguments(),
-                    (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
-                        && writeThrough() && !req.skipStore(),
+                    writeThrough() && !req.skipStore(),
                     !req.skipStore(),
                     sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
-                    true,
-                    true,
-                    primary,
-                    ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+                    /*event*/true,
+                    /*metrics*/true,
+                    /*primary*/true,
+                    /*verCheck*/false,
                     topVer,
                     req.filter(),
-                    replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+                    replicate ? DR_PRIMARY : DR_NONE,
                     newConflictTtl,
                     newConflictExpireTime,
                     newConflictVer,
-                    true,
+                    /*conflictResolve*/true,
                     intercept,
                     req.subjectId(),
                     taskName,
-                    null,
-                    null,
+                    /*prevVal*/null,
+                    /*updateCntr*/null,
                     dhtFut);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
@@ -2535,7 +2523,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         EntryProcessor<Object, Object, Object> entryProcessor = null;
 
                         if (!readersOnly) {
-                            dhtFut.addWriteEntry(entry,
+                            dhtFut.addWriteEntry(
+                                nearNode.id(),
+                                entry,
                                 updRes.newValue(),
                                 entryProcessor,
                                 updRes.newTtl(),
@@ -2547,7 +2537,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         }
 
                         if (!F.isEmpty(filteredReaders))
-                            dhtFut.addNearWriteEntries(filteredReaders,
+                            dhtFut.addNearWriteEntries(
+                                nearNode.id(),
+                                filteredReaders,
                                 entry,
                                 updRes.newValue(),
                                 entryProcessor,
@@ -2562,8 +2554,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (hasNear) {
-                    if (primary && updRes.sendToDht()) {
-                        if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
+                    if (updRes.sendToDht()) {
+                        if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                             // If put the same value as in request then do not need to send it back.
                             if (op == TRANSFORM || writeVal != updRes.newValue()) {
                                 res.addNearValue(i,
@@ -2575,13 +2567,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
 
                             if (updRes.newValue() != null) {
-                                IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+                                IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
 
                                 assert f == null : f;
                             }
                         }
-                        else if (F.contains(readers, node.id())) // Reader became primary or backup.
-                            entry.removeReader(node.id(), req.messageId());
+                        else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
+                            entry.removeReader(nearNode.id(), req.messageId());
                         else
                             res.addSkippedIndex(i);
                     }
@@ -2603,7 +2595,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
                         if (retVal == null)
-                            retVal = new GridCacheReturn(node.isLocal());
+                            retVal = new GridCacheReturn(nearNode.isLocal());
 
                         retVal.addEntryProcessResult(ctx,
                             k,
@@ -2619,7 +2611,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         CacheObject ret = updRes.oldValue();
 
                         retVal = new GridCacheReturn(ctx,
-                            node.isLocal(),
+                            nearNode.isLocal(),
                             req.keepBinary(),
                             req.returnValue() ? ret : null,
                             updRes.success());
@@ -2639,7 +2631,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param firstEntryIdx Index of the first entry in the request keys collection.
      * @param entries Entries to update.
      * @param ver Version to set.
-     * @param node Originating node.
+     * @param nearNode Originating node.
      * @param writeVals Write values.
      * @param putMap Values to put.
      * @param rmvKeys Keys to remove.
@@ -2661,7 +2653,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final int firstEntryIdx,
         final List<GridDhtCacheEntry> entries,
         final GridCacheVersion ver,
-        final ClusterNode node,
+        final ClusterNode nearNode,
         @Nullable final List<CacheObject> writeVals,
         @Nullable final Map<KeyCacheObject, CacheObject> putMap,
         @Nullable final Collection<KeyCacheObject> rmvKeys,
@@ -2765,12 +2757,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     if (checkReaders) {
                         readers = entry.readers();
-                        filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+                        filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
                     }
 
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
-                        node.id(),
+                        nearNode.id(),
                         locNodeId,
                         op,
                         writeVal,
@@ -2831,7 +2823,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
                         if (!batchRes.readersOnly()) {
-                            dhtFut.addWriteEntry(entry,
+                            dhtFut.addWriteEntry(
+                                nearNode.id(),
+                                entry,
                                 writeVal,
                                 entryProcessor,
                                 updRes.newTtl(),
@@ -2843,7 +2837,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         }
 
                         if (!F.isEmpty(filteredReaders))
-                            dhtFut.addNearWriteEntries(filteredReaders,
+                            dhtFut.addNearWriteEntries(
+                                nearNode.id(),
+                                filteredReaders,
                                 entry,
                                 writeVal,
                                 entryProcessor,
@@ -2853,7 +2849,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     if (hasNear) {
                         if (primary) {
-                            if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
+                            if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                                 int idx = firstEntryIdx + i;
 
                                 if (req.operation() == TRANSFORM) {
@@ -2866,13 +2862,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                     res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
 
                                 if (writeVal != null || entry.hasValue()) {
-                                    IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+                                    IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
 
                                     assert f == null : f;
                                 }
                             }
-                            else if (readers.contains(node.id())) // Reader became primary or backup.
-                                entry.removeReader(node.id(), req.messageId());
+                            else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+                                entry.removeReader(nearNode.id(), req.messageId());
                             else
                                 res.addSkippedIndex(firstEntryIdx + i);
                         }
@@ -3148,31 +3144,35 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean force
     ) {
-        if (!force) {
-            if (updateReq.fastMap())
-                return null;
-
-            AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
-            Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
-
-            // We are on primary node for some key.
-            assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
-                ctx.kernalContext().discovery().discoCache(topVer) + ']';
-
-            if (nodes.size() == 1) {
-                if (log.isDebugEnabled())
-                    log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
-                        "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
-
-                return null;
-            }
-        }
-
         if (updateReq.size() == 1)
             return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
         else
             return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+//        if (!force) {
+//            if (updateReq.fastMap())
+//                return null;
+//
+//            AffinityTopologyVersion topVer = updateReq.topologyVersion();
+//
+//            Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
+//
+//            // We are on primary node for some key.
+//            assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
+//                ctx.kernalContext().discovery().discoCache(topVer) + ']';
+//
+//            if (nodes.size() == 1) {
+//                if (log.isDebugEnabled())
+//                    log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
+//                        "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
+//
+//                return null;
+//            }
+//        }
+//
+//        if (updateReq.size() == 1)
+//            return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+//        else
+//            return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
     }
 
     /**
@@ -3225,9 +3225,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         GridCacheVersion ver = req.writeVersion();
 
-        // Always send update reply.
-        GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(),
-            ctx.deploymentEnabled());
+        GridDhtAtomicNearResponse nearRes = ctx.config().getWriteSynchronizationMode() == FULL_SYNC ?
+            new GridDhtAtomicNearResponse(req.nearFutureId(), req.dhtNodes()) : null;
 
         Boolean replicate = ctx.isDrEnabled();
 
@@ -3311,39 +3310,113 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 // Ignore.
             }
             catch (IgniteCheckedException e) {
-                res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+                if (nearRes != null)
+                    nearRes.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+
+                U.error(log, "Failed to update key on backup node: " + key, e);
             }
         }
 
-        if (isNearEnabled(cacheCfg))
-            ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
+        GridDhtAtomicUpdateResponse dhtRes = null;
 
-        try {
-            if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) {
-                ctx.io().send(nodeId, res, ctx.ioPolicy());
+        if (isNearEnabled(cacheCfg)) {
+            List<KeyCacheObject> nearEvicted =
+                ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
 
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Sent DHT atomic update response [futId=" + req.futureId() +
-                        ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
-                }
-            }
+            dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(), ctx.deploymentEnabled());
+
+            dhtRes.nearEvicted(nearEvicted);
+        }
+
+        final boolean RES_AFTER_ACK = false;
+
+        if (nearRes != null) {
+            if (RES_AFTER_ACK)
+                sendDhtNearResponse(nodeId, req, nearRes);
             else {
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureId() +
-                        ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
-                }
+                sendDhtNearResponse(null, req, nearRes);
 
-                // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
                 sendDeferredUpdateResponse(nodeId, req.futureId());
             }
         }
+        else
+            sendDeferredUpdateResponse(nodeId, req.futureId());
+
+        if (dhtRes != null)
+            sendDhtPrimaryResponse(nodeId, req, dhtRes);
+    }
+
+    /**
+     * @param nodeId Primary node ID.
+     * @param req Request.
+     * @param dhtRes Response to send.
+     */
+    private void sendDhtPrimaryResponse(UUID nodeId,
+        GridDhtAtomicAbstractUpdateRequest req,
+        GridDhtAtomicUpdateResponse dhtRes) {
+        try {
+            ctx.io().send(nodeId, dhtRes, ctx.ioPolicy());
+
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Sent DHT response [futId=" + req.futureId() +
+                    ", nearFutId=" + req.nearFutureId() +
+                    ", writeVer=" + req.writeVersion() +
+                    ", node=" + nodeId + ']');
+            }
+        }
         catch (ClusterTopologyCheckedException ignored) {
-            U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureId() +
-                ", node=" + req.nodeId() + ']');
+            U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() +
+                ", nearFutId=" + req.nearFutureId() +
+                ", node=" + nodeId + ']');
         }
         catch (IgniteCheckedException e) {
-            U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureId() +
-                ", node=" + nodeId +  ", res=" + res + ']', e);
+            U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+                ", nearFutId=" + req.nearFutureId() +
+                ", node=" + nodeId +
+                ", res=" + dhtRes + ']', e);
+        }
+    }
+
+    /**
+     * @param req Request.
+     * @param nearRes Response to send.
+     */
+    private void sendDhtNearResponse(final UUID primaryId,
+        final GridDhtAtomicAbstractUpdateRequest req,
+        GridDhtAtomicNearResponse nearRes) {
+        try {
+            ClusterNode node = ctx.discovery().node(req.nearNodeId());
+
+            if (node == null)
+                throw new ClusterTopologyCheckedException("Node left: " + req.nearNodeId());
+
+            if (primaryId != null) {
+                ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy(), new IgniteInClosure<IgniteException>() {
+                    @Override public void apply(IgniteException e) {
+                        sendDeferredUpdateResponse(primaryId, req.futureId());
+                    }
+                });
+            }
+            else
+                ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy());
+
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Sent DHT near response [futId=" + req.futureId() +
+                    ", nearFutId=" + req.nearFutureId() +
+                    ", writeVer=" + req.writeVersion() +
+                    ", node=" + req.nearNodeId() + ']');
+            }
+        }
+        catch (ClusterTopologyCheckedException ignored) {
+            U.warn(msgLog, "Failed to send DHT near response, node left [futId=" + req.futureId() +
+                ", nearFutId=" + req.nearFutureId() +
+                ", node=" + req.nearNodeId() + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+                ", nearFutId=" + req.nearFutureId() +
+                ", node=" + req.nearNodeId() +
+                ", res=" + nearRes + ']', e);
         }
     }
 
@@ -3359,8 +3432,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processNearAtomicDhtResponse(UUID nodeId, GridNearAtomicDhtResponse res) {
+    private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+        GridNearAtomicAbstractUpdateFuture updateFut =
+            (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
+        if (updateFut != null) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Received DHT atomic near response [futId=" + res.futureId() +
+                    ", node=" + nodeId + ']');
+            }
+
+            updateFut.onResult(nodeId, res);
+        }
+        else {
+            U.warn(msgLog, "Failed to find update future DHT atomic near response [futId=" + res.futureId() +
+                ", node=" + nodeId +
+                ", res=" + res + ']');
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
new file mode 100644
index 0000000..628e1dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * TODO IGNITE-4705: no not send mapping if it == affinity?
+ */
+public class GridDhtAtomicNearResponse extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** */
+    private static final int HAS_RESULT_MASK = 0x1;
+
+    /** */
+    private static final int RESULT_SUCCESS_MASK = 0x2;
+
+    /** */
+    private long futId;
+
+    /** */
+    @GridDirectCollection(UUID.class)
+    private List<UUID> mapping;
+
+    /** */
+    private byte flags;
+
+    /** */
+    private UpdateErrors errors;
+
+    /**
+     *
+     */
+    public GridDhtAtomicNearResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param mapping Update mapping.
+     */
+    public GridDhtAtomicNearResponse(long futId, List<UUID> mapping) {
+        this.futId = futId;
+        this.mapping = mapping;
+    }
+
+    /**
+     * @param key Key.
+     * @param e Error.
+     */
+    public void addFailedKey(KeyCacheObject key, Throwable e) {
+        if (errors == null)
+            errors = new UpdateErrors();
+
+        errors.addFailedKey(key, e);
+    }
+
+    /**
+     * @param success Success flag.
+     */
+    public void setResult(boolean success) {
+        setFlag(true, HAS_RESULT_MASK);
+
+        setFlag(success, RESULT_SUCCESS_MASK);
+    }
+
+    /**
+     * @return Operation result.
+     */
+    public GridCacheReturn result() {
+        assert hasResult();
+
+        return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK));
+    }
+
+    /**
+     * @return {@code True} if response contains operation result.
+     */
+    public boolean hasResult() {
+        return isFlag(HAS_RESULT_MASK);
+    }
+
+    /**
+     * @return Update mapping.
+     */
+    public List<UUID> mapping() {
+        return mapping;
+    }
+
+    /**
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reads flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -45;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 7;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (errors != null)
+            errors.prepareMarshal(this, ctx.cacheContext(cacheId));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (errors != null)
+            errors.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeMessage("errors", errors))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                errors = reader.readMessage("errors");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtAtomicNearResponse.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 0c8e482..671034c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -44,9 +43,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
-    private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
-
     /** Future keys. */
     private KeyCacheObject key;
 
@@ -97,7 +93,8 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
 
     /** {@inheritDoc} */
     @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
-        ClusterNode node,
+        UUID nodeId,
+        UUID nearNodeId,
         long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
@@ -106,11 +103,13 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
         long conflictExpireTime,
         @Nullable GridCacheVersion conflictVer
     ) {
-        if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) {
+        if (canUseSingleRequest(ttl, conflictExpireTime, conflictVer)) {
             return new GridDhtAtomicSingleUpdateRequest(
                 cctx.cacheId(),
-                node.id(),
+                nodeId,
                 futId,
+                nearNodeId,
+                updateReq.futureId(),
                 writeVer,
                 syncMode,
                 topVer,
@@ -123,8 +122,10 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
         else {
             return new GridDhtAtomicUpdateRequest(
                 cctx.cacheId(),
-                node.id(),
+                nodeId,
                 futId,
+                nearNodeId,
+                updateReq.futureId(),
                 writeVer,
                 syncMode,
                 topVer,
@@ -167,18 +168,15 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     }
 
     /**
-     * @param node Target node
      * @param ttl TTL.
      * @param conflictExpireTime Conflict expire time.
      * @param conflictVer Conflict version.
      * @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}.
      */
-    private boolean canUseSingleRequest(ClusterNode node,
-        long ttl,
+    private boolean canUseSingleRequest(long ttl,
         long conflictExpireTime,
         @Nullable GridCacheVersion conflictVer) {
-        return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 &&
-            (ttl == CU.TTL_NOT_CHANGED) &&
+        return (ttl == CU.TTL_NOT_CHANGED) &&
             (conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) &&
             conflictVer == null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 127c2be..e46c843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -51,7 +51,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     /** Near cache key flag. */
     private static final int NEAR_FLAG_MASK = 0x80;
 
-    /** Future version. */
+    /** Future ID on primary. */
     protected long futId;
 
     /** Write version. */
@@ -116,6 +116,8 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         int cacheId,
         UUID nodeId,
         long futId,
+        UUID nearNodeId,
+        long nearFutId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -125,7 +127,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         boolean keepBinary,
         boolean skipStore
     ) {
-        super(cacheId, nodeId);
+        super(cacheId, nodeId, nearNodeId, nearFutId);
         this.futId = futId;
         this.writeVer = writeVer;
         this.syncMode = syncMode;
@@ -423,73 +425,73 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         }
 
         switch (writer.state()) {
-            case 3:
+            case 6:
                 if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
-            case 4:
+            case 7:
                 if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 5:
+            case 8:
                 if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 9:
                 if (!writer.writeInt("partId", partId))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 10:
                 if (!writer.writeMessage("prevVal", prevVal))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 11:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 12:
                 if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 13:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 14:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
-            case 12:
+            case 15:
                 if (!writer.writeLong("updateCntr", updateCntr))
                     return false;
 
                 writer.incrementState();
 
-            case 13:
+            case 16:
                 if (!writer.writeMessage("val", val))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 17:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -511,7 +513,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 6:
                 flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
@@ -519,7 +521,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 4:
+            case 7:
                 futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
@@ -527,7 +529,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 5:
+            case 8:
                 key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
@@ -535,7 +537,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 6:
+            case 9:
                 partId = reader.readInt("partId");
 
                 if (!reader.isLastRead())
@@ -543,7 +545,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 7:
+            case 10:
                 prevVal = reader.readMessage("prevVal");
 
                 if (!reader.isLastRead())
@@ -551,7 +553,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 8:
+            case 11:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -559,7 +561,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 9:
+            case 12:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -571,7 +573,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 10:
+            case 13:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -579,7 +581,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 11:
+            case 14:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -587,7 +589,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 12:
+            case 15:
                 updateCntr = reader.readLong("updateCntr");
 
                 if (!reader.isLastRead())
@@ -595,7 +597,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 13:
+            case 16:
                 val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
@@ -603,7 +605,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 14:
+            case 17:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -653,7 +655,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 18;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 7cb75fa..ea6a1b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -122,7 +122,9 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     }
 
     /** {@inheritDoc} */
-    @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node,
+    @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
+        UUID nodeId,
+        UUID nearNodeId,
         long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
@@ -133,8 +135,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     ) {
         return new GridDhtAtomicUpdateRequest(
             cctx.cacheId(),
-            node.id(),
+            nodeId,
             futId,
+            nearNodeId,
+            updateReq.futureId(),
             writeVer,
             syncMode,
             topVer,


Mime
View raw message