ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [8/8] ignite git commit: ignite-4705 Atomic cache protocol change: notify client node from backups
Date Mon, 13 Mar 2017 15:09:42 GMT
ignite-4705 Atomic cache protocol change: notify client node from backups


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cbc472fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cbc472fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cbc472fe

Branch: refs/heads/ignite-2.0
Commit: cbc472fe7f058db42ce49652c85981c7b797d229
Parents: f59f46d
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Mar 13 18:07:20 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Mar 13 18:08:50 2017 +0300

----------------------------------------------------------------------
 .../rest/protocols/tcp/MockNioSession.java      |   5 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   5 +-
 .../internal/binary/BinaryObjectImpl.java       |  43 +-
 .../connection/GridClientNioTcpConnection.java  |   2 +-
 .../managers/communication/GridIoManager.java   |   5 +-
 .../communication/GridIoMessageFactory.java     |  20 +-
 .../processors/cache/CacheObjectContext.java    |   3 +-
 .../processors/cache/GridCacheAtomicFuture.java |   5 +-
 .../processors/cache/GridCacheIoManager.java    |  83 +-
 .../processors/cache/GridCacheMapEntry.java     |  12 +-
 .../processors/cache/GridCacheMessage.java      |  17 +-
 .../processors/cache/GridCacheMvccManager.java  |  48 +-
 .../processors/cache/GridCacheProcessor.java    |   1 -
 .../processors/cache/GridCacheReturn.java       |   6 +-
 .../cache/GridDeferredAckMessageSender.java     |  17 +-
 .../processors/cache/KeyCacheObjectImpl.java    |  65 +-
 .../dht/GridClientPartitionTopology.java        |   8 +
 .../dht/GridDhtPartitionTopology.java           |   9 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  23 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      | 298 +++---
 .../GridDhtAtomicAbstractUpdateRequest.java     | 392 +++++++-
 .../dht/atomic/GridDhtAtomicCache.java          | 896 +++++++++--------
 .../GridDhtAtomicDeferredUpdateResponse.java    |  68 +-
 .../dht/atomic/GridDhtAtomicNearResponse.java   | 314 ++++++
 .../atomic/GridDhtAtomicSingleUpdateFuture.java | 101 +-
 .../GridDhtAtomicSingleUpdateRequest.java       | 277 +-----
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  89 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 325 ++-----
 .../dht/atomic/GridDhtAtomicUpdateResponse.java | 124 +--
 ...idNearAtomicAbstractSingleUpdateRequest.java | 481 +---------
 .../GridNearAtomicAbstractUpdateFuture.java     | 468 +++++++--
 .../GridNearAtomicAbstractUpdateRequest.java    | 480 ++++++++-
 .../GridNearAtomicCheckUpdateRequest.java       | 175 ++++
 .../atomic/GridNearAtomicFullUpdateRequest.java | 487 +---------
 ...GridNearAtomicSingleUpdateFilterRequest.java |  23 +-
 .../GridNearAtomicSingleUpdateFuture.java       | 617 ++++++------
 ...GridNearAtomicSingleUpdateInvokeRequest.java |  37 +-
 .../GridNearAtomicSingleUpdateRequest.java      |  65 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 962 +++++++++++--------
 .../atomic/GridNearAtomicUpdateResponse.java    | 192 ++--
 .../distributed/dht/atomic/UpdateErrors.java    | 222 +++++
 .../distributed/near/GridNearAtomicCache.java   |  27 +-
 .../continuous/CacheContinuousQueryHandler.java |   2 +-
 .../cacheobject/IgniteCacheObjectProcessor.java |   5 +-
 .../IgniteCacheObjectProcessorImpl.java         |  18 +-
 .../ignite/internal/util/StripedExecutor.java   |   8 +-
 .../util/future/GridCompoundFuture.java         |  11 +-
 .../internal/util/ipc/IpcToNioAdapter.java      |   7 +-
 .../nio/GridConnectionBytesVerifyFilter.java    |   7 +-
 .../util/nio/GridNioAsyncNotifyFilter.java      |   7 +-
 .../internal/util/nio/GridNioCodecFilter.java   |   9 +-
 .../util/nio/GridNioEmbeddedFuture.java         |   7 +
 .../ignite/internal/util/nio/GridNioFilter.java |  12 +-
 .../internal/util/nio/GridNioFilterAdapter.java |   7 +-
 .../internal/util/nio/GridNioFilterChain.java   |  15 +-
 .../util/nio/GridNioFinishedFuture.java         |   5 -
 .../ignite/internal/util/nio/GridNioFuture.java |   7 -
 .../internal/util/nio/GridNioFutureImpl.java    |  18 +-
 .../ignite/internal/util/nio/GridNioServer.java |  83 +-
 .../internal/util/nio/GridNioSession.java       |   7 +-
 .../internal/util/nio/GridNioSessionImpl.java   |   9 +-
 .../util/nio/GridNioSessionMetaKey.java         |   5 +-
 .../util/nio/GridShmemCommunicationClient.java  |   6 +-
 .../util/nio/GridTcpNioCommunicationClient.java |  13 +-
 .../internal/util/nio/SessionWriteRequest.java  |   7 -
 .../internal/util/nio/ssl/GridNioSslFilter.java |  12 +-
 .../util/nio/ssl/GridNioSslHandler.java         |  29 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   2 +-
 .../org/apache/ignite/thread/IgniteThread.java  |  41 +-
 .../ignite/thread/IgniteThreadFactory.java      |   2 +-
 .../GridCommunicationSendMessageSelfTest.java   |   2 +-
 .../cache/CacheRebalancingSelfTest.java         |  16 +-
 .../GridCacheAbstractFailoverSelfTest.java      |   2 -
 .../GridCacheAtomicMessageCountSelfTest.java    |  22 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |   1 +
 ...niteCacheClientNodeChangingTopologyTest.java |   7 -
 .../IgniteCacheMessageRecoveryAbstractTest.java |   2 +-
 .../dht/GridCacheAtomicNearCacheSelfTest.java   |  23 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  37 +-
 ...gniteCachePutRetryTransactionalSelfTest.java |   2 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 883 +++++++++++++++++
 ...erNoStripedPoolMultiNodeFullApiSelfTest.java |  35 -
 .../near/GridCacheNearReadersSelfTest.java      |  17 +-
 ...edNoStripedPoolMultiNodeFullApiSelfTest.java |  35 -
 ...eContinuousQueryAsyncFilterListenerTest.java |   2 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |  31 +-
 ...eCacheContinuousQueryImmutableEntryTest.java |   2 +-
 .../nio/GridNioEmbeddedFutureSelfTest.java      |   2 +-
 .../util/future/nio/GridNioFutureSelfTest.java  |  25 +-
 .../nio/impl/GridNioFilterChainSelfTest.java    |  12 +-
 .../file/GridFileSwapSpaceSpiSelfTest.java      |   2 +-
 .../IgniteCacheFullApiSelfTestSuite.java        |   8 +-
 .../testsuites/IgniteCacheTestSuite5.java       |   3 +
 .../HadoopExternalCommunication.java            |   9 +-
 .../communication/HadoopIpcToNioAdapter.java    |   7 +-
 .../communication/HadoopMarshallerFilter.java   |  10 +-
 .../cache/IgniteGetAndPutBenchmark.java         |   2 +-
 .../cache/IgniteGetAndPutTxBenchmark.java       |   2 +-
 98 files changed, 5462 insertions(+), 3597 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
index 9bc4e7f..9d1755f 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
@@ -19,11 +19,13 @@ package org.apache.ignite.internal.processors.rest.protocols.tcp;
 
 import java.net.InetSocketAddress;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
 import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -112,7 +114,8 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
     }
 
     /** {@inheritDoc} */
-    @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+    @Override public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC)
+        throws IgniteCheckedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 2d35cdb..f6cfe12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1703,8 +1703,9 @@ public class IgnitionEx {
 
             sysExecSvc.allowCoreThreadTimeOut(true);
 
-            if (cfg.getStripedPoolSize() > 0)
-                stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log);
+            validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool");
+
+            stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log);
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 7a81659..6fe1a3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -17,6 +17,17 @@
 
 package org.apache.ignite.internal.binary;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectException;
@@ -33,19 +44,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.UUID;
-
-import static java.nio.charset.StandardCharsets.*;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 /**
  * Binary object implementation.
@@ -74,7 +73,6 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
     private boolean detachAllowed;
 
     /** */
-    @GridDirectTransient
     private int part = -1;
 
     /**
@@ -561,7 +559,6 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
 
         start = in.readInt();
     }
-
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -584,6 +581,12 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
                 writer.incrementState();
 
             case 1:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
                 if (!writer.writeInt("start", detachAllowed ? 0 : start))
                     return false;
 
@@ -611,6 +614,14 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
                 reader.incrementState();
 
             case 1:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
                 start = reader.readInt("start");
 
                 if (!reader.isLastRead())
@@ -620,7 +631,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
 
         }
 
-        return true;
+        return reader.afterMessageRead(BinaryObjectImpl.class);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 8937504..d3a30fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -229,7 +229,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
             GridNioFuture<?> sslHandshakeFut = null;
 
             if (sslCtx != null) {
-                sslHandshakeFut = new GridNioFutureImpl<>();
+                sslHandshakeFut = new GridNioFutureImpl<>(null);
 
                 meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 0c0dbf7..23738d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -818,10 +818,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return;
         }
 
-        if (ctx.config().getStripedPoolSize() > 0 &&
-            plc == GridIoPolicy.SYSTEM_POOL &&
-            msg.partition() != Integer.MIN_VALUE
-            ) {
+        if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != Integer.MIN_VALUE) {
             ctx.getStripedExecutorService().execute(msg.partition(), c);
 
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f95400..0548581 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -67,14 +67,17 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -118,11 +121,11 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
 import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -173,6 +176,21 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -47:
+                msg = new GridNearAtomicCheckUpdateRequest();
+
+                break;
+
+            case -46:
+                msg = new UpdateErrors();
+
+                break;
+
+            case -45:
+                msg = new GridDhtAtomicNearResponse();
+
+                break;
+
             case -44:
                 msg = new TcpCommunicationSpi.HandshakeMessage2();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
index c4203ef..a777ab6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
@@ -33,7 +33,8 @@ import org.apache.ignite.internal.util.typedef.F;
 /**
  *
  */
-@SuppressWarnings("TypeMayBeWeakened") public class CacheObjectContext {
+@SuppressWarnings("TypeMayBeWeakened")
+public class CacheObjectContext {
     /** */
     private GridKernalContext kernalCtx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 3e11d50..8df229e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -19,16 +19,15 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
  * Update future for atomic cache.
  */
 public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
-     * @return Future version.
+     * @return Future ID.
      */
-    public GridCacheVersion version();
+    public Long id();
 
     /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 1f28201..1cd8fbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -47,15 +47,18 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -85,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -213,15 +217,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             }
 
             if (fut != null && !fut.isDone()) {
+                Thread curThread = Thread.currentThread();
+
+                final int stripe = curThread instanceof IgniteThread ? ((IgniteThread)curThread).stripe() : -1;
+
                 fut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> t) {
-                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                        Runnable c = new Runnable() {
                             @Override public void run() {
                                 IgniteLogger log = cacheMsg.messageLogger(cctx);
 
                                 if (log.isDebugEnabled()) {
                                     StringBuilder msg0 = new StringBuilder("Process cache message after wait for " +
-                                        "affinity topology version [");
+                                            "affinity topology version [");
 
                                     appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
 
@@ -230,7 +238,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                                 handleMessage(nodeId, cacheMsg);
                             }
-                        });
+                        };
+
+                        if (stripe >= 0)
+                            cctx.kernalContext().getStripedExecutorService().execute(stripe, c);
+                        else
+                            cctx.kernalContext().closure().runLocalSafe(c);
                     }
                 });
 
@@ -471,15 +484,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param cacheMsg Cache message.
      * @return Atomic future ID if applicable for message.
      */
-    @Nullable private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) {
+    @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
         if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
-            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
+            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
         else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
-            return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion();
+            return ((GridNearAtomicUpdateResponse) cacheMsg).futureId();
         else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
-            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
+            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureId();
         else if (cacheMsg instanceof GridDhtAtomicUpdateResponse)
-            return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion();
+            return ((GridDhtAtomicUpdateResponse) cacheMsg).futureId();
+        else if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest)
+            return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId();
 
         return null;
     }
@@ -490,9 +505,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @return Atomic future ID if applicable for message.
      */
     @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
-        if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
-            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
-        else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+        if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
             return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
 
         return null;
@@ -561,12 +574,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
                     ctx.cacheId(),
-                    req.futureVersion(),
+                    req.partition(),
+                    req.futureId(),
                     ctx.deploymentEnabled());
 
                 res.onError(req.classError());
 
                 sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+
+                if (req.nearNodeId() != null) {
+                    GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+                        req.partition(),
+                        req.nearFutureId(),
+                        nodeId,
+                        req.flags());
+
+                    nearRes.errors(new UpdateErrors(req.classError()));
+
+                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+                }
             }
 
             break;
@@ -577,7 +603,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
+                    req.partition(),
+                    false,
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -755,7 +783,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
+                    req.partition(),
+                    false,
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -771,7 +801,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
+                    req.partition(),
+                    false,
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -787,7 +819,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
+                    req.futureId(),
+                    req.partition(),
+                    false,
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -802,12 +836,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
                     ctx.cacheId(),
-                    req.futureVersion(),
+                    req.partition(),
+                    req.futureId(),
                     ctx.deploymentEnabled());
 
                 res.onError(req.classError());
 
                 sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+
+                if (req.nearNodeId() != null) {
+                    GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+                        req.partition(),
+                        req.nearFutureId(),
+                        nodeId,
+                        req.flags());
+
+                    nearRes.errors(new UpdateErrors(req.classError()));
+
+                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+                }
             }
 
             break;
@@ -894,7 +941,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     @SuppressWarnings("unchecked")
     public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
-        assert !node.isLocal();
+        assert !node.isLocal() : node;
 
         if (!onSend(msg, node.id()))
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2237e22..54b4ed7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2170,8 +2170,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                     assert conflictCtx != null;
 
-                    boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
                     // Use old value?
                     if (conflictCtx.isUseOld()) {
                         GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
@@ -2180,7 +2178,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         if (!isNew() &&                                                                       // Not initial value,
                             verCheck &&                                                                       // and atomic version check,
                             oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&                 // and data centers are equal,
-                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
+                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, true) == 0 && // and both versions are equal,
                             cctx.writeThrough() &&                                                            // and store is enabled,
                             primary)                                                                          // and we are primary.
                         {
@@ -2226,13 +2224,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     conflictVer = null;
             }
 
-            boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
             // Perform version check only in case there was no explicit conflict resolution.
             if (conflictCtx == null) {
                 if (verCheck) {
-                    if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
-                        if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
+                    if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) >= 0) {
+                        if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) == 0 && cctx.writeThrough() && primary) {
                             if (log.isDebugEnabled())
                                 log.debug("Received entry update with same version as current (will update store) " +
                                     "[entry=" + this + ", newVer=" + newVer + ']');
@@ -2307,7 +2303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     }
                 }
                 else
-                    assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
+                    assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) <= 0 :
                         "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 0646d5a..4de465c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message {
     private static final long serialVersionUID = 0L;
 
     /** Maximum number of cache lookup indexes. */
-    public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 5;
+    public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 7;
 
     /** Cache message index field name. */
     public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";
@@ -501,7 +501,7 @@ public abstract class GridCacheMessage implements Message {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+    public final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
         GridCacheContext ctx) throws IgniteCheckedException {
         if (col == null)
             return;
@@ -553,7 +553,7 @@ public abstract class GridCacheMessage implements Message {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+    public final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
         GridCacheContext ctx,
         ClassLoader ldr)
         throws IgniteCheckedException
@@ -701,6 +701,17 @@ public abstract class GridCacheMessage implements Message {
         return reader.afterMessageRead(GridCacheMessage.class);
     }
 
+    /**
+     * @param str Bulder.
+     * @param name Flag name.
+     */
+    protected final void appendFlag(StringBuilder str, String name) {
+        if (str.length() > 0)
+            str.append('|');
+
+        str.append(name);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheMessage.class, this, "cacheId", cacheId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 4ec13fc..dff2c88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -105,9 +106,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     @GridToStringExclude
     private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
 
+    /** */
+    private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis());
+
     /** Pending atomic futures. */
-    private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
-        new ConcurrentHashMap8<>();
+    private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
 
     /** Pending data streamer futures. */
     private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
@@ -253,10 +256,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
                 if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                    GridCacheVersion futVer = cacheFut.version();
+                    Long futId = cacheFut.id();
 
-                    if (futVer != null)
-                        atomicFuts.remove(futVer, cacheFut);
+                    if (futId != null)
+                        atomicFuts.remove(futId, cacheFut);
                 }
             }
         }
@@ -423,14 +426,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param futVer Future ID.
+     * @return ID for atomic cache update future.
+     */
+    public long atomicFutureId() {
+        return atomicFutId.incrementAndGet();
+    }
+
+    /**
+     * @param futId Future ID.
      * @param fut Future.
      * @return {@code False} if future was forcibly completed with error.
      */
-    public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
-        IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
+    public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+        IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
 
-        assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
+        assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
 
         return onFutureAdded(fut);
     }
@@ -443,6 +453,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Number of pending atomic futures.
+     */
+    public int atomicFuturesCount() {
+        return atomicFuts.size();
+    }
+
+    /**
      * @return Collection of pending data streamer futures.
      */
     public Collection<DataStreamerFuture> dataStreamerFutures() {
@@ -452,19 +469,19 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /**
      * Gets future by given future ID.
      *
-     * @param futVer Future ID.
+     * @param futId Future ID.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer) {
-        return atomicFuts.get(futVer);
+    @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+        return atomicFuts.get(futId);
     }
 
     /**
-     * @param futVer Future ID.
+     * @param futId Future ID.
      * @return Removed future.
      */
-    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer) {
-        return atomicFuts.remove(futVer);
+    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+        return atomicFuts.remove(futId);
     }
 
     /**
@@ -481,6 +498,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
 
     /**
      * @param topVer Topology version.
+     * @return Future.
      */
     public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) {
         final DataStreamerFuture fut = new DataStreamerFuture(topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f7ac812..c7ac31a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -859,7 +859,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         if (!ctx.clientNode() && !ctx.isDaemon())
             addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index 02c882c..c5d4066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -126,12 +126,10 @@ public class GridCacheReturn implements Externalizable, Message {
     }
 
     /**
-     * Checks if value is not {@code null}.
      *
-     * @return {@code True} if value is not {@code null}.
      */
-    public boolean hasValue() {
-        return v != null;
+    public boolean emptyResult() {
+        return !invokeRes && v  == null && cacheObj == null && success;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 7145dc2..5265ec9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -41,30 +41,31 @@ public abstract class GridDeferredAckMessageSender {
     private GridTimeoutProcessor time;
 
     /** Closure processor. */
-    public GridClosureProcessor closure;
+    public GridClosureProcessor c;
 
     /**
      * @param time Time.
-     * @param closure Closure.
+     * @param c Closure.
      */
     public GridDeferredAckMessageSender(GridTimeoutProcessor time,
-        GridClosureProcessor closure) {
+        GridClosureProcessor c) {
         this.time = time;
-        this.closure = closure;
+        this.c = c;
     }
 
     /**
-     *
+     * @return Timeout.
      */
     public abstract int getTimeout();
 
     /**
-     *
+     * @return Buffer size.
      */
     public abstract int getBufferSize();
 
     /**
-     *
+     * @param nodeId Node ID.
+     * @param vers Versions to send.
      */
     public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
 
@@ -151,7 +152,7 @@ public abstract class GridDeferredAckMessageSender {
         /** {@inheritDoc} */
         @Override public void onTimeout() {
             if (guard.compareAndSet(false, true)) {
-                closure.runLocalSafe(new Runnable() {
+                c.runLocalSafe(new Runnable() {
                     @Override public void run() {
                         writeLock().lock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index 146e554..4f8570c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -29,7 +31,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
     private static final long serialVersionUID = 0L;
 
     /** */
-    @GridDirectTransient
     private int part = -1;
 
     /**
@@ -42,14 +43,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
     /**
      * @param val Value.
      * @param valBytes Value bytes.
-     */
-    public KeyCacheObjectImpl(Object val, byte[] valBytes) {
-        this(val, valBytes, -1);
-    }
-
-    /**
-     * @param val Value.
-     * @param valBytes Value bytes.
      * @param part Partition.
      */
     public KeyCacheObjectImpl(Object val, byte[] valBytes, int part) {
@@ -130,7 +123,57 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 1;
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 1:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(KeyCacheObjectImpl.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 1:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 2af822a..6ca15de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -409,6 +410,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public List<ClusterNode> nodes(int p,
+        AffinityAssignment affAssignment,
+        List<ClusterNode> affNodes) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         lock.readLock().lock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index bdd84b0..605150a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -170,6 +171,14 @@ public interface GridDhtPartitionTopology {
 
     /**
      * @param p Partition ID.
+     * @param affAssignment Assignments.
+     * @param affNodes Node assigned for given partition by affinity.
+     * @return Collection of all nodes responsible for this partition with primary node being first.
+     */
+    @Nullable public List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes);
+
+    /**
+     * @param p Partition ID.
      * @return Collection of all nodes who {@code own} this partition.
      */
     public List<ClusterNode> owners(int p);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 49de280..53257d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -828,11 +828,32 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public List<ClusterNode> nodes(int p,
+        AffinityAssignment affAssignment,
+        List<ClusterNode> affNodes) {
+        return nodes0(p, affAssignment, affNodes);
+    }
+
+    /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 
+        List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes);
+
+        return nodes != null ? nodes : affNodes;
+    }
+
+    /**
+     * @param p Partition.
+     * @param affAssignment Assignments.
+     * @param affNodes Node assigned for given partition by affinity.
+     * @return Nodes responsible for given partition (primary is first).
+     */
+    @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) {
+        AffinityTopologyVersion topVer = affAssignment.topologyVersion();
+
         lock.readLock().lock();
 
         try {
@@ -866,7 +887,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            return nodes != null ? nodes : affNodes;
+            return nodes;
         }
         finally {
             lock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 4cb113e..5ff5aa4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -30,10 +31,13 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -41,14 +45,15 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  * DHT atomic cache backup update future.
@@ -74,56 +79,38 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     protected final GridCacheContext cctx;
 
     /** Future version. */
-    protected final GridCacheVersion futVer;
-
-    /** Completion callback. */
-    @GridToStringExclude
-    private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+    @GridToStringInclude
+    protected final long futId;
 
     /** Update request. */
-    protected final GridNearAtomicAbstractUpdateRequest updateReq;
-
-    /** Update response. */
-    final GridNearAtomicUpdateResponse updateRes;
+    final GridNearAtomicAbstractUpdateRequest updateReq;
 
     /** Mappings. */
-    @GridToStringInclude
+    @GridToStringExclude
     protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
 
     /** Continuous query closures. */
     private Collection<CI1<Boolean>> cntQryClsrs;
 
-    /** */
-    private final boolean waitForExchange;
-
     /** Response count. */
     private volatile int resCnt;
 
     /**
      * @param cctx Cache context.
-     * @param completionCb Callback to invoke when future is completed.
      * @param writeVer Write version.
      * @param updateReq Update request.
-     * @param updateRes Update response.
      */
     protected GridDhtAtomicAbstractUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq,
-        GridNearAtomicUpdateResponse updateRes
+        GridNearAtomicAbstractUpdateRequest updateReq
     ) {
         this.cctx = cctx;
 
-        this.futVer = cctx.isLocalNode(updateRes.nodeId()) ?
-            cctx.versions().next(updateReq.topologyVersion()) : // Generate new if request mapped to local.
-            updateReq.futureVersion();
         this.updateReq = updateReq;
-        this.completionCb = completionCb;
-        this.updateRes = updateRes;
         this.writeVer = writeVer;
 
-        waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
+        futId = cctx.mvcc().atomicFutureId();
 
         if (log == null) {
             msgLog = cctx.shared().atomicMessageLogger();
@@ -131,8 +118,15 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         }
     }
 
+    /**
+     * @return {@code True} if all updates are sent to DHT.
+     */
+    protected abstract boolean sendAllToDht();
+
     /** {@inheritDoc} */
     @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        boolean waitForExchange = !updateReq.topologyLocked();
+
         if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
             return this;
 
@@ -141,17 +135,23 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /**
      * @param clsr Continuous query closure.
+     * @param sync Synchronous continuous query flag.
      */
-    public final void addContinuousQueryClosure(CI1<Boolean> clsr) {
+    public final void addContinuousQueryClosure(CI1<Boolean> clsr, boolean sync) {
         assert !isDone() : this;
 
-        if (cntQryClsrs == null)
-            cntQryClsrs = new ArrayList<>(10);
+        if (sync)
+            clsr.apply(true);
+        else {
+            if (cntQryClsrs == null)
+                cntQryClsrs = new ArrayList<>(10);
 
-        cntQryClsrs.add(clsr);
+            cntQryClsrs.add(clsr);
+        }
     }
 
     /**
+     * @param affAssignment Affinity assignment.
      * @param entry Entry to map.
      * @param val Value to write.
      * @param entryProcessor Entry processor.
@@ -163,7 +163,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param updateCntr Partition update counter.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    final void addWriteEntry(GridDhtCacheEntry entry,
+    final void addWriteEntry(
+        AffinityAssignment affAssignment,
+        GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
@@ -174,7 +176,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         long updateCntr) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-        List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+        List<ClusterNode> affNodes = affAssignment.get(entry.partition());
+
+        List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes);
+
+        if (dhtNodes == null)
+            dhtNodes = affNodes;
 
         if (log.isDebugEnabled())
             log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
@@ -193,8 +200,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
                 if (updateReq == null) {
                     updateReq = createRequest(
-                        node,
-                        futVer,
+                        node.id(),
+                        futId,
                         writeVer,
                         syncMode,
                         topVer,
@@ -212,7 +219,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     conflictExpireTime,
                     conflictVer,
                     addPrevVal,
-                    entry.partition(),
                     prevVal,
                     updateCntr);
             }
@@ -239,7 +245,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param ttl TTL for near cache update (optional).
      * @param expireTime Expire time for near cache update (optional).
      */
-    final void addNearWriteEntries(Collection<UUID> readers,
+    final void addNearWriteEntries(
+        Collection<UUID> readers,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
@@ -262,8 +269,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     continue;
 
                 updateReq = createRequest(
-                    node,
-                    futVer,
+                    node.id(),
+                    futId,
                     writeVer,
                     syncMode,
                     topVer,
@@ -274,8 +281,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 mappings.put(nodeId, updateReq);
             }
 
-            addNearReaderEntry(entry);
-
             updateReq.addNearWriteValue(entry.key(),
                 val,
                 entryProcessor,
@@ -284,12 +289,15 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         }
     }
 
-    /**
-     * adds new nearReader.
-     *
-     * @param entry GridDhtCacheEntry.
-     */
-    protected abstract void addNearReaderEntry(GridDhtCacheEntry entry);
+    /** {@inheritDoc} */
+    @Override public final IgniteUuid futureId() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public final Long id() {
+        return futId;
+    }
 
     /**
      * @return Write version.
@@ -299,21 +307,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /** {@inheritDoc} */
-    @Override public final IgniteUuid futureId() {
-        return futVer.asGridUuid();
-    }
-
-    /** {@inheritDoc} */
-    @Override public final GridCacheVersion version() {
-        return futVer;
-    }
-
-    /** {@inheritDoc} */
     @Override public final boolean onNodeLeft(UUID nodeId) {
         boolean res = registerResponse(nodeId);
 
         if (res && msgLog.isDebugEnabled()) {
-            msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer +
+            msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer +
                 ", node=" + nodeId + ']');
         }
 
@@ -324,7 +322,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param nodeId Node ID.
      * @return {@code True} if request found.
      */
-    final boolean registerResponse(UUID nodeId) {
+    private boolean registerResponse(UUID nodeId) {
         int resCnt0;
 
         GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
@@ -353,41 +351,103 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /**
      * Sends requests to remote nodes.
+     *
+     * @param nearNode Near node.
+     * @param ret Cache operation return value.
+     * @param updateRes Response.
+     * @param completionCb Callback to invoke to send response to near node.
+     */
+    final void map(ClusterNode nearNode,
+        GridCacheReturn ret,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb) {
+        if (F.isEmpty(mappings)) {
+            updateRes.dhtNodes(Collections.<UUID>emptyList());
+
+            completionCb.apply(updateReq, updateRes);
+
+            onDone();
+
+            return;
+        }
+
+        boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
+            !ret.emptyResult() ||
+            updateRes.nearVersion() != null ||
+            cctx.localNodeId().equals(nearNode.id());
+
+        boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
+
+        if (needMapping) {
+            initMapping(updateRes);
+
+            needReplyToNear = true;
+        }
+
+        sendDhtRequests(nearNode, ret);
+
+        if (needReplyToNear)
+            completionCb.apply(updateReq, updateRes);
+    }
+
+    /**
+     * @param updateRes Response.
      */
-    final void map() {
+    private void initMapping(GridNearAtomicUpdateResponse updateRes) {
+        List<UUID> dhtNodes;
+
         if (!F.isEmpty(mappings)) {
-            for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
-                try {
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+            dhtNodes = new ArrayList<>(mappings.size());
 
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, sent request [futId=" + futVer +
-                            ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-                    }
+            dhtNodes.addAll(mappings.keySet());
+        }
+        else
+            dhtNodes = Collections.emptyList();
+
+        updateRes.dhtNodes(dhtNodes);
+    }
+
+    /**
+     * @param nearNode Near node.
+     * @param ret Return value.
+     */
+    private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret) {
+        for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
+            try {
+                assert !cctx.localNodeId().equals(req.nodeId()) : req;
+
+                if (updateReq.fullSync()) {
+                    req.nearReplyInfo(nearNode.id(), updateReq.futureId());
+
+                    if (ret.emptyResult())
+                        req.hasResult(true);
                 }
-                catch (ClusterTopologyCheckedException ignored) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer +
-                            ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-                    }
 
-                    registerResponse(req.nodeId());
+                if (cntQryClsrs != null)
+                    req.replyWithoutDelay(true);
+
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DTH update fut, sent request [futId=" + futId +
+                        ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
                 }
-                catch (IgniteCheckedException ignored) {
-                    U.error(msgLog, "Failed to send request [futId=" + futVer +
+            }
+            catch (ClusterTopologyCheckedException ignored) {
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId +
                         ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-
-                    registerResponse(req.nodeId());
                 }
+
+                registerResponse(req.nodeId());
             }
-        }
-        else
-            onDone();
+            catch (IgniteCheckedException ignored) {
+                U.error(msgLog, "Failed to send request [futId=" + futId +
+                    ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
 
-        // Send response right away if no ACKs from backup is required.
-        // Backups will send ACKs anyway, future will be completed after all backups have replied.
-        if (updateReq.writeSynchronizationMode() != FULL_SYNC)
-            completionCb.apply(updateReq, updateRes);
+                registerResponse(req.nodeId());
+            }
+        }
     }
 
     /**
@@ -395,7 +455,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      *
      * @param nodeId Backup node ID.
      */
-    public final void onResult(UUID nodeId) {
+    final void onDeferredResponse(UUID nodeId) {
         if (log.isDebugEnabled())
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
@@ -403,8 +463,31 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /**
-     * @param node Node.
-     * @param futVer Future version.
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    final void onDhtResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
+        if (!F.isEmpty(res.nearEvicted())) {
+            for (KeyCacheObject key : res.nearEvicted()) {
+                try {
+                    GridDhtCacheEntry entry = (GridDhtCacheEntry)cctx.cache().peekEx(key);
+
+                    if (entry != null)
+                        entry.removeReader(nodeId, res.messageId());
+                }
+                catch (GridCacheEntryRemovedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Entry with evicted reader was removed [key=" + key + ", err=" + e + ']');
+                }
+            }
+        }
+
+        registerResponse(nodeId);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param futId Future ID.
      * @param writeVer Update version.
      * @param syncMode Write synchronization mode.
      * @param topVer Topology version.
@@ -414,8 +497,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @return Request.
      */
     protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
-        ClusterNode node,
-        GridCacheVersion futVer,
+        UUID nodeId,
+        long futId,
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
@@ -424,38 +507,18 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         @Nullable GridCacheVersion conflictVer
     );
 
-    /**
-     * Callback for backup update response.
-     *
-     * @param nodeId Backup node ID.
-     * @param updateRes Update response.
-     */
-    public abstract void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes);
-
-    /**
-     * @param updateRes Response.
-     * @param err Error.
-     */
-    protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err);
-
     /** {@inheritDoc} */
     @Override public final boolean onDone(@Nullable Void res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
-            cctx.mvcc().removeAtomicFuture(version());
+            cctx.mvcc().removeAtomicFuture(futId);
 
             boolean suc = err == null;
 
-            if (!suc)
-                addFailedKeys(updateRes, err);
-
             if (cntQryClsrs != null) {
                 for (CI1<Boolean> clsr : cntQryClsrs)
                     clsr.apply(suc);
             }
 
-            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
-                completionCb.apply(updateReq, updateRes);
-
             return true;
         }
 
@@ -471,4 +534,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     @Override public void markNotTrackable() {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        synchronized (this) {
+            Map<UUID, String> dhtRes = F.viewReadOnly(mappings,
+                new IgniteClosure<GridDhtAtomicAbstractUpdateRequest, String>() {
+                    @Override public String apply(GridDhtAtomicAbstractUpdateRequest req) {
+                        return "[res=" + req.hasResponse() +
+                            ", size=" + req.size() +
+                            ", nearSize=" + req.nearSize() + ']';
+                    }
+                }
+            );
+
+            return S.toString(GridDhtAtomicAbstractUpdateFuture.class, this, "dhtRes", dhtRes);
+        }
+    }
 }


Mime
View raw message