ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4705
Date Wed, 01 Mar 2017 09:43:32 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4705 8e77bc744 -> 4a72759d1


ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a72759d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a72759d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a72759d

Branch: refs/heads/ignite-4705
Commit: 4a72759d10bae76788ba5a5c26ad263010130b1d
Parents: 8e77bc7
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Mar 1 12:00:35 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Mar 1 12:43:24 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMvccManager.java  |   3 +-
 .../GridCachePartitionExchangeManager.java      |  10 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |   1 +
 .../GridDhtAtomicAbstractUpdateRequest.java     | 180 ++++++++++++--
 .../dht/atomic/GridDhtAtomicCache.java          |  62 +----
 .../dht/atomic/GridDhtAtomicNearResponse.java   |   8 +
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |   4 +-
 .../GridDhtAtomicSingleUpdateRequest.java       | 186 ++------------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   4 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 240 ++++---------------
 .../GridNearAtomicAbstractUpdateFuture.java     |  25 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  12 +-
 .../atomic/GridNearAtomicUpdateResponse.java    |   2 +-
 .../internal/util/ipc/IpcToNioAdapter.java      |   5 +-
 .../ignite/internal/util/nio/GridNioFilter.java |   2 +
 .../ignite/internal/util/nio/GridNioServer.java |  39 ++-
 .../internal/util/nio/GridNioSession.java       |   4 +-
 .../internal/util/nio/GridNioSessionImpl.java   |   3 +-
 .../util/nio/ssl/GridNioSslHandler.java         |   7 +-
 ...niteCacheClientNodeChangingTopologyTest.java |   4 -
 .../atomic/IgniteCacheAtomicProtocolTest.java   |   7 +-
 .../util/future/nio/GridNioFutureSelfTest.java  |   2 +-
 .../communication/HadoopMarshallerFilter.java   |   1 +
 23 files changed, 340 insertions(+), 471 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 7f9f18c..f3d3fb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -453,7 +453,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @return Collection of pending atomic futures.
+     * @return Number of pending atomic futures.
      */
     public int atomicFuturesCount() {
         return atomicFuts.size();
@@ -498,6 +498,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
 
     /**
      * @param topVer Topology version.
+     * @return Future.
      */
     public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) {
         final DataStreamerFuture fut = new DataStreamerFuture(topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f14f612..e44f4a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1349,8 +1349,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (!readyFuts.isEmpty()) {
             U.warn(log, "Pending affinity ready futures:");
 
-            int cnt = 0;
-
             for (AffinityReadyFuture fut : readyFuts.values())
                 U.warn(log, ">>> " + fut);
         }
@@ -1510,15 +1508,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             U.warn(log, "Pending atomic cache futures:");
 
-            int cnt = 0;
-
-            for (GridCacheFuture<?> fut : mvcc.atomicFutures()) {
+            for (GridCacheFuture<?> fut : mvcc.atomicFutures())
                 U.warn(log, ">>> " + fut);
 
-                if (cnt++ >= 10)
-                    break;
-            }
-
             U.warn(log, "Pending data streamer futures:");
 
             for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index e9b6b62..0877d24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -310,6 +310,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /**
      * @param nodeId Node ID.
      * @param nodeErr Node error flag.
+     * @param errors Errors instance if DHT node failed to unmarshal message.
      * @return {@code True} if request found.
      */
     private boolean registerResponse(UUID nodeId, boolean nodeErr, UpdateErrors errors) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 9bc4f81..1d01a49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -62,6 +64,24 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
+    /** Future ID on primary. */
+    protected long futId;
+
+    /** Write version. */
+    protected GridCacheVersion writeVer;
+
+    /** Write synchronization mode. */
+    protected CacheWriteSynchronizationMode syncMode;
+
+    /** Topology version. */
+    protected AffinityTopologyVersion topVer;
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
     /** Node ID. */
     @GridDirectTransient
     protected UUID nodeId;
@@ -98,11 +118,36 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
      * @param nearNodeId Near node ID.
      * @param nearFutId Future ID on near node.
      */
-    protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId, UUID nearNodeId, long nearFutId) {
+    protected GridDhtAtomicAbstractUpdateRequest(int cacheId,
+        UUID nodeId,
+        long futId,
+        UUID nearNodeId,
+        long nearFutId,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        UUID subjId,
+        int taskNameHash,
+        boolean addDepInfo,
+        boolean keepBinary,
+        boolean skipStore
+    ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
+        this.futId = futId;
         this.nearNodeId = nearNodeId;
         this.nearFutId = nearFutId;
+        this.writeVer = writeVer;
+        this.syncMode = syncMode;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.addDepInfo = addDepInfo;
+
+        if (skipStore)
+            setFlag(true, DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
+        if (keepBinary)
+            setFlag(true, DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
     }
 
     /**
@@ -124,14 +169,14 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /**
      * @param dhtNodes DHT nodes.
      */
-    public void dhtNodes(List<UUID> dhtNodes) {
+    void dhtNodes(List<UUID> dhtNodes) {
         this.dhtNodes = dhtNodes;
     }
 
     /**
      * @return DHT nodes.
      */
-    public List<UUID> dhtNodes() {
+    List<UUID> dhtNodes() {
         return dhtNodes;
     }
 
@@ -157,12 +202,16 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /**
      * @return Keep binary flag.
      */
-    public abstract boolean keepBinary();
+    public final boolean keepBinary() {
+        return isFlag(DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
+    }
 
     /**
      * @return Skip write-through to a persistent storage.
      */
-    public abstract boolean skipStore();
+    public final boolean skipStore() {
+        return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
+    }
 
     /**
      * @return {@code True} if on response flag changed.
@@ -171,6 +220,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         return !onRes && (onRes = true);
     }
 
+    /**
+     * @return {@code True} if response was received.
+     */
     boolean hasResponse() {
         return onRes;
     }
@@ -238,34 +290,44 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
     /**
      * @return Subject ID.
      */
-    public abstract UUID subjectId();
+    public final UUID subjectId() {
+        return subjId;
+    }
 
     /**
      * @return Task name.
      */
-    public abstract int taskNameHash();
+    public final int taskNameHash() {
+        return taskNameHash;
+    }
 
     /**
      * @return Future ID on primary node.
      */
-    public abstract long futureId();
+    public final long futureId() {
+        return futId;
+    }
 
     /**
      * @return Future ID on near node.
      */
-    public final long nearFutureId() {
+    final long nearFutureId() {
         return nearFutId;
     }
 
     /**
      * @return Write version.
      */
-    public abstract GridCacheVersion writeVersion();
+    public final GridCacheVersion writeVersion() {
+        return writeVer;
+    }
 
     /**
      * @return Cache write synchronization mode.
      */
-    public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+    public final CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
 
     /**
      * @return Keys size.
@@ -382,13 +444,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
      * @param mask Mask to read.
      * @return Flag value.
      */
-    protected final boolean isFlag(int mask) {
+    final boolean isFlag(int mask) {
         return (flags & mask) != 0;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 13;
     }
 
     /** {@inheritDoc} */
@@ -419,17 +481,53 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeLong("nearFutId", nearFutId))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeLong("nearFutId", nearFutId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
+            case 8:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeMessage("writeVer", writeVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -463,7 +561,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
                 reader.incrementState();
 
             case 5:
-                nearFutId = reader.readLong("nearFutId");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -471,6 +569,14 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
                 reader.incrementState();
 
             case 6:
+                nearFutId = reader.readLong("nearFutId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
@@ -478,6 +584,50 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
 
                 reader.incrementState();
 
+            case 8:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 10:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                writeVer = reader.readMessage("writeVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtAtomicAbstractUpdateRequest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 933436e..ed6b184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -108,7 +108,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
@@ -1813,7 +1812,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
 
-                        dhtFut = createDhtFuture(ver, req, false);
+                        dhtFut = createDhtFuture(ver, req);
 
                         expiry = expiryPolicy(req.expiry());
 
@@ -2467,7 +2466,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     dhtFut);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                    dhtFut = createDhtFuture(ver, req, true);
+                    dhtFut = createDhtFuture(ver, req);
 
                     readersOnly = true;
                 }
@@ -2772,7 +2771,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     batchRes.addDeleted(entry, updRes, entries);
 
                     if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                        dhtFut = createDhtFuture(ver, req, true);
+                        dhtFut = createDhtFuture(ver, req);
 
                         batchRes.readersOnly(true);
                     }
@@ -2874,19 +2873,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             KeyCacheObject key = req.key(0);
 
             while (true) {
-                try {
-                    GridDhtCacheEntry entry = entryExx(key, topVer);
+                GridDhtCacheEntry entry = entryExx(key, topVer);
 
-                    GridUnsafe.monitorEnter(entry);
+                GridUnsafe.monitorEnter(entry);
 
-                    if (entry.obsolete())
-                        GridUnsafe.monitorExit(entry);
-                    else
-                        return Collections.singletonList(entry);
-                }
-                catch (GridDhtInvalidPartitionException e) {
-                    throw e;
-                }
+                if (entry.obsolete())
+                    GridUnsafe.monitorExit(entry);
+                else
+                    return Collections.singletonList(entry);
             }
         }
         else {
@@ -2894,14 +2888,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             while (true) {
                 for (int i = 0; i < req.size(); i++) {
-                    try {
-                        GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+                    GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
 
-                        locked.add(entry);
-                    }
-                    catch (GridDhtInvalidPartitionException e) {
-                        throw e;
-                    }
+                    locked.add(entry);
                 }
 
                 boolean retry = false;
@@ -3083,43 +3072,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *
      * @param writeVer Write version.
      * @param updateReq Update request.
-     * @param force If {@code true} then creates future without optimizations checks.
      * @return Backup update future or {@code null} if there are no backups.
      */
     @Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq,
-        boolean force
+        GridNearAtomicAbstractUpdateRequest updateReq
     ) {
         if (updateReq.size() == 1)
             return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
         else
             return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
-//        if (!force) {
-//            if (updateReq.fastMap())
-//                return null;
-//
-//            AffinityTopologyVersion topVer = updateReq.topologyVersion();
-//
-//            Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
-//
-//            // We are on primary node for some key.
-//            assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
-//                ctx.kernalContext().discovery().discoCache(topVer) + ']';
-//
-//            if (nodes.size() == 1) {
-//                if (log.isDebugEnabled())
-//                    log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
-//                        "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
-//
-//                return null;
-//            }
-//        }
-//
-//        if (updateReq.size() == 1)
-//            return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
-//        else
-//            return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index e494288..2e9b616 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -94,6 +94,8 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
         List<UUID> mapping,
         byte flags)
     {
+        assert primaryId != null;
+
         this.cacheId = cacheId;
         this.partId = partId;
         this.futId = futId;
@@ -102,10 +104,16 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
         this.flags = flags;
     }
 
+    /**
+     * @return Errors.
+     */
     @Nullable UpdateErrors errors() {
         return errors;
     }
 
+    /**
+     * @param errors Errors.
+     */
     void errors(UpdateErrors errors) {
         this.errors = errors;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 86fbdfd..2cc370f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -102,13 +102,13 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
                 writeVer,
                 syncMode,
                 topVer,
-                false,
                 updateReq.subjectId(),
                 updateReq.taskNameHash(),
                 null,
                 cctx.deploymentEnabled(),
                 updateReq.keepBinary(),
-                updateReq.skipStore());
+                updateReq.skipStore(),
+                false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 77bcc26..fa7c445 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -38,9 +38,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
-
 /**
  *
  */
@@ -48,24 +45,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Future ID on primary. */
-    protected long futId;
-
-    /** Write version. */
-    protected GridCacheVersion writeVer;
-
-    /** Write synchronization mode. */
-    protected CacheWriteSynchronizationMode syncMode;
-
-    /** Topology version. */
-    protected AffinityTopologyVersion topVer;
-
-    /** Subject ID. */
-    protected UUID subjId;
-
-    /** Task name hash. */
-    protected int taskNameHash;
-
     /** Key to update. */
     @GridToStringInclude
     protected KeyCacheObject key;
@@ -118,19 +97,19 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         boolean keepBinary,
         boolean skipStore
     ) {
-        super(cacheId, nodeId, nearNodeId, nearFutId);
-        this.futId = futId;
-        this.writeVer = writeVer;
-        this.syncMode = syncMode;
-        this.topVer = topVer;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.addDepInfo = addDepInfo;
-
-        if (skipStore)
-            setFlag(true, DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
-        if (keepBinary)
-            setFlag(true, DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
+        super(cacheId,
+            nodeId,
+            futId,
+            nearNodeId,
+            nearFutId,
+            writeVer,
+            syncMode,
+            topVer,
+            subjId,
+            taskNameHash,
+            addDepInfo,
+            keepBinary,
+            skipStore);
     }
 
     /**
@@ -214,11 +193,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Override public boolean skipStore() {
-        return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
-    }
-
-    /** {@inheritDoc} */
     @Override public KeyCacheObject key(int idx) {
         assert idx == 0 : idx;
 
@@ -252,31 +226,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Override public long futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion writeVersion() {
-        return writeVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID subjectId() {
-        return subjId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
-        return syncMode;
-    }
-
-    /** {@inheritDoc} */
     @Override @Nullable public CacheObject previousValue(int idx) {
         assert idx == 0 : idx;
 
@@ -344,11 +293,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         return null;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean keepBinary() {
-        return isFlag(DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
-    }
-
     /**
      *
      */
@@ -405,62 +349,26 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
         }
 
         switch (writer.state()) {
-            case 7:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeMessage("key", key))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeMessage("prevVal", prevVal))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeUuid("subjId", subjId))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
-                    return false;
-
-                writer.incrementState();
-
             case 13:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeLong("updateCntr", updateCntr))
+                if (!writer.writeMessage("prevVal", prevVal))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeMessage("val", val))
+                if (!writer.writeLong("updateCntr", updateCntr))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeMessage("writeVer", writeVer))
+                if (!writer.writeMessage("val", val))
                     return false;
 
                 writer.incrementState();
@@ -481,60 +389,8 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
             return false;
 
         switch (reader.state()) {
-            case 7:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                key = reader.readMessage("key");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                prevVal = reader.readMessage("prevVal");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                subjId = reader.readUuid("subjId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                byte syncModeOrd;
-
-                syncModeOrd = reader.readByte("syncMode");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
-                reader.incrementState();
-
-            case 12:
-                taskNameHash = reader.readInt("taskNameHash");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 13:
-                topVer = reader.readMessage("topVer");
+                key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
                     return false;
@@ -542,7 +398,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 14:
-                updateCntr = reader.readLong("updateCntr");
+                prevVal = reader.readMessage("prevVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -550,7 +406,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 15:
-                val = reader.readMessage("val");
+                updateCntr = reader.readLong("updateCntr");
 
                 if (!reader.isLastRead())
                     return false;
@@ -558,7 +414,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 16:
-                writeVer = reader.readMessage("writeVer");
+                val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 22ecef8..1c12193 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -84,13 +84,13 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
             writeVer,
             syncMode,
             topVer,
-            false,
             updateReq.subjectId(),
             updateReq.taskNameHash(),
             null,
             cctx.deploymentEnabled(),
             updateReq.keepBinary(),
-            updateReq.skipStore());
+            updateReq.skipStore(),
+            false);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 8ffd9af..ef42af8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -51,15 +51,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Future ID. */
-    private long futId;
-
-    /** Write version. */
-    private GridCacheVersion writeVer;
-
-    /** Topology version. */
-    private AffinityTopologyVersion topVer;
-
     /** Keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -91,9 +82,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     /** Near expire times. */
     private GridLongList nearExpireTimes;
 
-    /** Write synchronization mode. */
-    private CacheWriteSynchronizationMode syncMode;
-
     /** Near cache keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -130,18 +118,9 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     /** Entry processor arguments bytes. */
     private byte[][] invokeArgsBytes;
 
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name hash. */
-    private int taskNameHash;
-
     /** Partition. */
     private GridLongList updateCntrs;
 
-    /** Keep binary flag. */
-    private boolean keepBinary;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -173,32 +152,32 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
-        boolean forceTransformBackups,
         UUID subjId,
         int taskNameHash,
         Object[] invokeArgs,
         boolean addDepInfo,
         boolean keepBinary,
-        boolean skipStore
+        boolean skipStore,
+        boolean forceTransformBackups
     ) {
-        super(cacheId, nodeId, nearNodeId, nearFutId);
-
-        this.futId = futId;
-        this.writeVer = writeVer;
-        this.syncMode = syncMode;
-        this.topVer = topVer;
-        this.forceTransformBackups = forceTransformBackups;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
+        super(cacheId,
+            nodeId,
+            futId,
+            nearNodeId,
+            nearFutId,
+            writeVer,
+            syncMode,
+            topVer,
+            subjId,
+            taskNameHash,
+            addDepInfo,
+            keepBinary,
+            skipStore);
 
         assert invokeArgs == null || forceTransformBackups;
 
+        this.forceTransformBackups = forceTransformBackups;
         this.invokeArgs = invokeArgs;
-        this.addDepInfo = addDepInfo;
-        this.keepBinary = keepBinary;
-
-        if (skipStore)
-            setFlag(true, DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
 
         keys = new ArrayList<>();
 
@@ -344,31 +323,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     }
 
     /** {@inheritDoc} */
-    @Override public UUID subjectId() {
-        return subjId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int taskNameHash() {
-        return taskNameHash;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion writeVersion() {
-        return writeVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
-        return syncMode;
-    }
-
-    /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
@@ -503,16 +457,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     }
 
     /** {@inheritDoc} */
-    @Override public boolean keepBinary() {
-        return keepBinary;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean skipStore() {
-        return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
-    }
-
-    /** {@inheritDoc} */
     @Override @Nullable public Object[] invokeArguments() {
         return invokeArgs;
     }
@@ -592,110 +536,74 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
         }
 
         switch (writer.state()) {
-            case 7:
-                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
-                    return false;
-
-                writer.incrementState();
-
             case 13:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
+                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+                if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeMessage("nearTtls", nearTtls))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeMessage("nearTtls", nearTtls))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -718,12 +626,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 writer.incrementState();
 
-            case 28:
-                if (!writer.writeMessage("writeVer", writeVer))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -740,56 +642,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
             return false;
 
         switch (reader.state()) {
-            case 7:
-                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                forceTransformBackups = reader.readBoolean("forceTransformBackups");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
             case 13:
-                keepBinary = reader.readBoolean("keepBinary");
+                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -797,7 +651,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 14:
-                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -805,7 +659,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 15:
-                nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
                     return false;
@@ -813,7 +667,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 16:
-                nearExpireTimes = reader.readMessage("nearExpireTimes");
+                forceTransformBackups = reader.readBoolean("forceTransformBackups");
 
                 if (!reader.isLastRead())
                     return false;
@@ -821,7 +675,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 17:
-                nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -829,7 +683,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 18:
-                nearTtls = reader.readMessage("nearTtls");
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -837,7 +691,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 19:
-                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+                nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
                     return false;
@@ -845,7 +699,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 20:
-                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
+                nearExpireTimes = reader.readMessage("nearExpireTimes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -853,7 +707,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 21:
-                subjId = reader.readUuid("subjId");
+                nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -861,19 +715,15 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 22:
-                byte syncModeOrd;
-
-                syncModeOrd = reader.readByte("syncMode");
+                nearTtls = reader.readMessage("nearTtls");
 
                 if (!reader.isLastRead())
                     return false;
 
-                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
                 reader.incrementState();
 
             case 23:
-                taskNameHash = reader.readInt("taskNameHash");
+                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -881,7 +731,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 24:
-                topVer = reader.readMessage("topVer");
+                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -912,14 +762,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
                 reader.incrementState();
 
-            case 28:
-                writeVer = reader.readMessage("writeVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
         return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class);
@@ -938,7 +780,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 29;
+        return 28;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 134256e..d73f029 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -337,7 +337,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param topVer Topology version.
      * @return Future ID in case future added.
      */
-    protected final Long addAtomicFuture(AffinityTopologyVersion topVer) {
+    final Long addAtomicFuture(AffinityTopologyVersion topVer) {
+        // TODO IGNITE-4705: it seems no need to add future inside read lock.
+
         Long futId = cctx.mvcc().atomicFutureId();
 
         synchronized (mux) {
@@ -381,6 +383,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         }
 
         /**
+         * @return {@code True} if all expected responses are received.
+         */
+        private boolean finished() {
+            return mapping != null && mapping.isEmpty() && hasRes;
+        }
+
+        /**
          * @param cctx Context.
          * @param nodeIds DHT nodes.
          */
@@ -401,6 +410,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return Request if need process primary response, {@code null} otherwise.
          */
         @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId) {
+            if (finished())
+                return null;
+
             if (req != null && req.nodeId().equals(nodeId) && req.response() == null)
                 return req;
 
@@ -413,6 +425,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if request processing finished.
          */
         boolean onMappingReceived(GridCacheContext cctx, GridNearAtomicMappingResponse res) {
+            if (finished())
+                return false;
+
             if (mapping == null) {
                 initMapping(cctx, res.mapping());
 
@@ -428,6 +443,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if request processing finished.
          */
         boolean onNodeLeft(UUID nodeId) {
+            if (finished())
+                return false;
+
             if (mapping != null && mapping.remove(nodeId)) {
                 if (mapping.isEmpty() && hasRes)
                     return true;
@@ -445,6 +463,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if request processing finished.
          */
         boolean onDhtResponse(GridCacheContext cctx, UUID nodeId, GridDhtAtomicNearResponse res) {
+            if (finished())
+                return false;
+
             if (res.primaryDhtFailureResponse()) {
                 assert res.mapping() != null : res;
                 assert res.failedNodeId() != null : res;
@@ -483,6 +504,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if request processing finished.
          */
         boolean onPrimaryResponse(GridCacheContext cctx, GridNearAtomicUpdateResponse res) {
+            assert !finished() : this;
+
             hasRes = true;
 
             boolean onRes = req.onResponse(res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9d60a74..89b2573 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -287,7 +287,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
         GridCacheReturn opRes0;
         CachePartialUpdateCheckedException err0;
-        AffinityTopologyVersion remapTopVer0 = null;
+        AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
             if (futId == null || futId != res.futureId())
@@ -577,7 +577,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         });
     }
 
-    private AffinityTopologyVersion onAllReceived() {
+    /**
+     * @return Non null topology version if update should be remapped.
+     */
+    @Nullable private AffinityTopologyVersion onAllReceived() {
         AffinityTopologyVersion remapTopVer0 = null;
 
         if (remapKeys != null) {
@@ -833,7 +836,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
                     topVer,
                     futId,
-                    null,
                     remapKeys);
 
                 if (pendingMappings.size() == 1)
@@ -899,7 +901,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @param topNodes Cache nodes.
      * @param topVer Topology version.
      * @param futId Future ID.
-     * @param updVer Update version.
      * @param remapKeys Keys to remap.
      * @return Mapping.
      * @throws Exception If failed.
@@ -908,7 +909,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
         Long futId,
-        @Nullable GridCacheVersion updVer,
         @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
         Iterator<?> it = null;
 
@@ -996,7 +996,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     nodeId,
                     futId,
                     false,
-                    updVer,
+                    null,
                     topVer,
                     topLocked,
                     syncMode,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index b134c53..76f2ea0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -236,7 +236,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param ttl TTL for near cache update.
      * @param expireTime Expire time for near cache update.
      */
-    public void addNearValue(int keyIdx,
+    void addNearValue(int keyIdx,
         @Nullable CacheObject val,
         long ttl,
         long expireTime) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index a4a7838..7af6139 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -203,7 +203,10 @@ public class IpcToNioAdapter<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut, IgniteInClosure<IgniteException> ackC) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+            Object msg,
+            boolean fut,
+            IgniteInClosure<IgniteException> ackC) {
             assert ses == IpcToNioAdapter.this.ses;
 
             return send((Message)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
index 6d3d350..9163a4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
@@ -108,6 +108,7 @@ public interface GridNioFilter {
      * @param ses Session instance.
      * @param msg Message to send.
      * @param fut {@code True} if write future should be created.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Write future or {@code null}.
      * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
      */
@@ -158,6 +159,7 @@ public interface GridNioFilter {
      * @param ses Session on which message should be written.
      * @param msg Message being written.
      * @param fut {@code True} if write future should be created.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Write future or {@code null}.
      * @throws GridNioException If GridNioException occurred while handling event.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 0ee1691..dae8d6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -480,9 +480,13 @@ public class GridNioServer<T> {
      * @param ses Session.
      * @param msg Message.
      * @param createFut {@code True} if future should be created.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+    GridNioFuture<?> send(GridNioSession ses,
+        ByteBuffer msg,
+        boolean createFut,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
@@ -507,9 +511,13 @@ public class GridNioServer<T> {
      * @param ses Session.
      * @param msg Message.
      * @param createFut {@code True} if future should be created.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+    GridNioFuture<?> send(GridNioSession ses,
+        Message msg,
+        boolean createFut,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
@@ -541,8 +549,6 @@ public class GridNioServer<T> {
         assert ses != null;
         assert req != null;
 
-        IgniteInClosure<IgniteException> ackC;
-
         int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req);
 
         if (ses.closed()) {
@@ -2662,8 +2668,12 @@ public class GridNioServer<T> {
          * @param ses Session.
          * @param msg Message.
          * @param skipRecovery Skip recovery flag.
+         * @param ackC Closure invoked when message ACK is received.
          */
-        WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery, IgniteInClosure<IgniteException> ackC) {
+        WriteRequestImpl(GridNioSession ses,
+            Object msg,
+            boolean skipRecovery,
+            IgniteInClosure<IgniteException> ackC) {
             this.ses = ses;
             this.msg = msg;
             this.skipRecovery = skipRecovery;
@@ -2818,8 +2828,12 @@ public class GridNioServer<T> {
          * @param ses Session to change.
          * @param op Requested operation.
          * @param msg Message.
+         * @param ackC Closure invoked when message ACK is received.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg, IgniteInClosure<IgniteException> ackC) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses,
+            NioOperation op,
+            Object msg,
+            IgniteInClosure<IgniteException> ackC) {
             super(ackC);
 
             assert ses != null;
@@ -2839,9 +2853,13 @@ public class GridNioServer<T> {
          * @param op Requested operation.
          * @param commMsg Direct message.
          * @param skipRecovery Skip recovery flag.
+         * @param ackC Closure invoked when message ACK is received.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
-            Message commMsg, boolean skipRecovery, IgniteInClosure<IgniteException> ackC) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses,
+            NioOperation op,
+            Message commMsg,
+            boolean skipRecovery,
+            IgniteInClosure<IgniteException> ackC) {
             super(ackC);
 
             assert ses != null;
@@ -3009,7 +3027,10 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+            Object msg,
+            boolean fut,
+            IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
             if (directMode) {
                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index aa2c241..21eabf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -110,8 +110,10 @@ public interface GridNioSession {
     /**
      * @param msg Message to be sent.
      * @param ackC Optional closure invoked when ack for message is received.
+     * @throws IgniteCheckedException If failed.
      */
-    public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException;
+    public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC)
+        throws IgniteCheckedException;
 
     /**
      * Gets metadata associated with specified key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 52fb591..98a22d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -117,7 +117,8 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
-    @Override public void sendNoFuture(Object msg, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+    @Override public void sendNoFuture(Object msg, IgniteInClosure<IgniteException> ackC)
+        throws IgniteCheckedException {
         try {
             chain().onSessionWrite(this, msg, false, ackC);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index 1d44f9d..e268716 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -414,6 +414,7 @@ class GridNioSslHandler extends ReentrantLock {
      * Adds write request to the queue.
      *
      * @param buf Buffer to write.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Write future.
      */
     GridNioFuture<?> deferredWrite(ByteBuffer buf, IgniteInClosure<IgniteException> ackC) {
@@ -477,6 +478,7 @@ class GridNioSslHandler extends ReentrantLock {
      * Copies data from out net buffer and passes it to the underlying chain.
      *
      * @return Write future.
+     * @param ackC Closure invoked when message ACK is received.
      * @throws GridNioException If send failed.
      */
     GridNioFuture<?> writeNetBuffer(IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
@@ -685,8 +687,11 @@ class GridNioSslHandler extends ReentrantLock {
          *
          * @param fut Future.
          * @param buf Buffer to write.
+         * @param ackC Closure invoked when message ACK is received.
          */
-        private WriteRequest(GridNioEmbeddedFuture<Object> fut, ByteBuffer buf, IgniteInClosure<IgniteException> ackC) {
+        private WriteRequest(GridNioEmbeddedFuture<Object> fut,
+            ByteBuffer buf,
+            IgniteInClosure<IgniteException> ackC) {
             this.fut = fut;
             this.buf = buf;
             this.ackC = ackC;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index e3c45be..cbb2b8a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -1694,8 +1694,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testAtomicPrimaryPutAllMultinode() throws Exception {
-        //fail("https://issues.apache.org/jira/browse/IGNITE-1685");
-
         multinode(PRIMARY, TestType.PUT_ALL);
     }
 
@@ -1703,8 +1701,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testAtomicClockPutAllMultinode() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1685");
-
         multinode(CLOCK, TestType.PUT_ALL);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index d26f11b..df1938b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -291,8 +291,11 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
         for (Ignite node : nodes) {
             IgniteCache<Integer, Integer> cache = node.cache(TEST_CACHE);
 
-            for (Map.Entry<Integer, Integer> e : expData.entrySet())
-                assertEquals(e.getValue(), cache.get(e.getKey()));
+            for (Map.Entry<Integer, Integer> e : expData.entrySet()) {
+                assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']',
+                    e.getValue(),
+                    cache.get(e.getKey()));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java
index cc1294b..44a1eff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java
@@ -85,7 +85,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testOnCancelled() throws Exception {
         GridTestUtils.assertThrows(log, new Callable<Object>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a72759d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
index 894a5f9..d90a900 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
@@ -68,6 +68,7 @@ public class HadoopMarshallerFilter extends GridNioFilterAdapter {
         return proceedSessionWrite(ses, U.marshal(marsh, msg), fut, ackC);
     }
 
+    /** {@inheritDoc} */
     @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
         assert msg instanceof byte[];
 


Mime
View raw message