ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [20/51] [abbrv] ignite git commit: ignite-2523 : Created SingleUpdateResponse
Date Thu, 25 Feb 2016 12:31:19 GMT
ignite-2523 : Created SingleUpdateResponse


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfdd2f4f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfdd2f4f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfdd2f4f

Branch: refs/heads/ignite-2523
Commit: dfdd2f4f907651a86da6fd52f18d2f189f65279d
Parents: a18c352
Author: Ilya Lantukh <ilantukh@gridgain.com>
Authored: Mon Feb 8 17:23:02 2016 +0300
Committer: Ilya Lantukh <ilantukh@gridgain.com>
Committed: Mon Feb 8 17:23:02 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  10 +-
 .../processors/cache/GridCacheIoManager.java    |  10 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  38 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   8 +-
 .../GridNearAtomicMultipleUpdateRequest.java    |   6 +-
 .../GridNearAtomicMultipleUpdateResponse.java   | 640 ++++++++++++++++++
 .../GridNearAtomicSingleUpdateRequest.java      |   6 +-
 .../GridNearAtomicSingleUpdateResponse.java     | 641 +++++++++++++++++++
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  22 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java |   4 +-
 .../atomic/GridNearAtomicUpdateResponse.java    | 616 ++----------------
 .../distributed/near/GridNearAtomicCache.java   |   4 +-
 .../IgniteClientReconnectCacheTest.java         |   4 +-
 .../IgniteClientReconnectCollectionsTest.java   |   6 +-
 14 files changed, 1382 insertions(+), 633 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 88e34c9..4d769af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -68,9 +68,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -368,7 +369,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 41:
-                msg = new GridNearAtomicUpdateResponse();
+                msg = new GridNearAtomicMultipleUpdateResponse();
 
                 break;
 
@@ -732,6 +733,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case -24:
+                msg = new GridNearAtomicSingleUpdateResponse();
+
+                break;
+
             // [-3..119] [124] - this
             // [120..123] - DR
             // [-4..-22] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 8a8f161..ea97277 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -46,7 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -412,7 +412,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             case 40: {
                 GridNearAtomicMultipleUpdateRequest req = (GridNearAtomicMultipleUpdateRequest)msg;
 
-                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
                     req.futureVersion(),
@@ -443,7 +443,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             break;
 
             case 45: {
-                processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander.
+                processMessage(nodeId, msg, c);// Will be handled by Rebalance Demander.
             }
 
             break;
@@ -544,7 +544,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             break;
 
             case 114: {
-                processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander.
+                processMessage(nodeId, msg, c);// Will be handled by Rebalance Demander.
             }
 
             break;
@@ -590,7 +590,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             case -23: {
                 GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
 
-                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
                     req.futureVersion(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 55db70a..b0504db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
     /** Update reply closure. */
-    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> updateReplyClos;
 
     /** Pending  */
     private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -192,9 +192,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+        updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse>() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-            @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+            @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicMultipleUpdateResponse res) {
                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                     assert req.writeSynchronizationMode() != FULL_ASYNC : req;
 
@@ -262,8 +262,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
-            @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
+        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicMultipleUpdateResponse.class, new CI2<UUID, GridNearAtomicMultipleUpdateResponse>() {
+            @Override public void apply(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
                 processNearAtomicUpdateResponse(nodeId, res);
             }
         });
@@ -1311,7 +1311,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     public void updateAllAsyncInternal(
         final UUID nodeId,
         final GridNearAtomicUpdateRequest req,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
 
@@ -1336,9 +1336,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     public void updateAllAsyncInternal0(
         UUID nodeId,
         GridNearAtomicUpdateRequest req,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb
     ) {
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+        GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
             ctx.deploymentEnabled());
 
         List<KeyCacheObject> keys = req.keys();
@@ -1559,11 +1559,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         ClusterNode node,
         boolean hasNear,
         GridNearAtomicUpdateRequest req,
-        GridNearAtomicUpdateResponse res,
+        GridNearAtomicMultipleUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -1975,11 +1975,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         ClusterNode node,
         boolean hasNear,
         GridNearAtomicUpdateRequest req,
-        GridNearAtomicUpdateResponse res,
+        GridNearAtomicMultipleUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2214,9 +2214,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable Collection<KeyCacheObject> rmvKeys,
         @Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
         final GridNearAtomicUpdateRequest req,
-        final GridNearAtomicUpdateResponse res,
+        final GridNearAtomicMultipleUpdateResponse res,
         boolean replicate,
         UpdateBatchResult batchRes,
         String taskName,
@@ -2594,7 +2594,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @return {@code True} if filter evaluation succeeded.
      */
     private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
-        GridNearAtomicUpdateResponse res) {
+        GridNearAtomicMultipleUpdateResponse res) {
         try {
             return ctx.isAllLocked(entry, req.filter());
         }
@@ -2688,8 +2688,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
         GridNearAtomicUpdateRequest updateReq,
-        GridNearAtomicUpdateResponse updateRes,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridNearAtomicMultipleUpdateResponse updateRes,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
         boolean force
     ) {
         if (!force) {
@@ -2734,7 +2734,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param res Near atomic update response.
      */
     @SuppressWarnings("unchecked")
-    private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
+    private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing near atomic update response [nodeId=" + nodeId + ", res=" + res + ']');
 
@@ -2944,7 +2944,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nodeId Originating node ID.
      * @param res Near update response.
      */
-    private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) {
+    private void sendNearUpdateReply(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
         try {
             ctx.io().send(nodeId, res, ctx.ioPolicy());
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3a31700..68c639d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -77,7 +77,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
 
     /** Completion callback. */
     @GridToStringExclude
-    private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+    private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb;
 
     /** Mappings. */
     @GridToStringInclude
@@ -90,7 +90,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
     private final GridNearAtomicUpdateRequest updateReq;
 
     /** Update response. */
-    private final GridNearAtomicUpdateResponse updateRes;
+    private final GridNearAtomicMultipleUpdateResponse updateRes;
 
     /** Future keys. */
     private final Collection<KeyCacheObject> keys;
@@ -110,10 +110,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
      */
     public GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicUpdateRequest updateReq,
-        GridNearAtomicUpdateResponse updateRes
+        GridNearAtomicMultipleUpdateResponse updateRes
     ) {
         this.cctx = cctx;
         this.writeVer = writeVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
index 650d350..6f109be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
@@ -157,7 +157,7 @@ public class GridNearAtomicMultipleUpdateRequest extends GridCacheMessage
 
     /** */
     @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
+    private GridNearAtomicMultipleUpdateResponse res;
 
     /** Maximum possible size of inner collections. */
     @GridDirectTransient
@@ -502,7 +502,7 @@ public class GridNearAtomicMultipleUpdateRequest extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+    @Override public boolean onResponse(GridNearAtomicMultipleUpdateResponse res) {
         if (this.res == null) {
             this.res = res;
 
@@ -513,7 +513,7 @@ public class GridNearAtomicMultipleUpdateRequest extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable public GridNearAtomicUpdateResponse response() {
+    @Override @Nullable public GridNearAtomicMultipleUpdateResponse response() {
         return res;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
new file mode 100644
index 0000000..d22acc4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * DHT atomic cache near update response.
+ */
+public class GridNearAtomicMultipleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponse {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** Node ID this reply should be sent to. */
+    @GridDirectTransient
+    private UUID nodeId;
+
+    /** Future version. */
+    private GridCacheVersion futVer;
+
+    /** Update error. */
+    @GridDirectTransient
+    private volatile IgniteCheckedException err;
+
+    /** Serialized error. */
+    private byte[] errBytes;
+
+    /** Return value. */
+    @GridToStringInclude
+    private GridCacheReturn ret;
+
+    /** Failed keys. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private volatile Collection<KeyCacheObject> failedKeys;
+
+    /** Keys that should be remapped. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> remapKeys;
+
+    /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
+    @GridDirectCollection(int.class)
+    private List<Integer> nearValsIdxs;
+
+    /** Indexes of keys for which update was skipped (used if originating node has near cache). */
+    @GridDirectCollection(int.class)
+    private List<Integer> nearSkipIdxs;
+
+    /** Values generated on primary node which should be put to originating node's near cache. */
+    @GridToStringInclude
+    @GridDirectCollection(CacheObject.class)
+    private List<CacheObject> nearVals;
+
+    /** Version generated on primary node to be used for originating node's near cache update. */
+    private GridCacheVersion nearVer;
+
+    /** Near TTLs. */
+    private GridLongList nearTtls;
+
+    /** Near expire times. */
+    private GridLongList nearExpireTimes;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridNearAtomicMultipleUpdateResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID this reply should be sent to.
+     * @param futVer Future version.
+     * @param addDepInfo Deployment info flag.
+     */
+    public GridNearAtomicMultipleUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
+        assert futVer != null;
+
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+        this.futVer = futVer;
+        this.addDepInfo = addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /**
+     * @return Node ID this response should be sent to.
+     */
+    @Override public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     */
+    @Override public void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Future version.
+     */
+    @Override public GridCacheVersion futureVersion() {
+        return futVer;
+    }
+
+    /**
+     * Sets update error.
+     *
+     * @param err Error.
+     */
+    @Override public void error(IgniteCheckedException err){
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException error() {
+        return err;
+    }
+
+    /**
+     * @return Collection of failed keys.
+     */
+    @Override public Collection<KeyCacheObject> failedKeys() {
+        return failedKeys;
+    }
+
+    /**
+     * @return Return value.
+     */
+    @Override public GridCacheReturn returnValue() {
+        return ret;
+    }
+
+    /**
+     * @param ret Return value.
+     */
+    @Override @SuppressWarnings("unchecked")
+    public void returnValue(GridCacheReturn ret) {
+        this.ret = ret;
+    }
+
+    /**
+     * @param remapKeys Remap keys.
+     */
+    @Override public void remapKeys(List<KeyCacheObject> remapKeys) {
+        this.remapKeys = remapKeys;
+    }
+
+    /**
+     * @return Remap keys.
+     */
+    @Override public Collection<KeyCacheObject> remapKeys() {
+        return remapKeys;
+    }
+
+    /**
+     * Adds value to be put in near cache on originating node.
+     *
+     * @param keyIdx Key index.
+     * @param val Value.
+     * @param ttl TTL for near cache update.
+     * @param expireTime Expire time for near cache update.
+     */
+    @Override public void addNearValue(int keyIdx,
+        @Nullable CacheObject val,
+        long ttl,
+        long expireTime) {
+        if (nearValsIdxs == null) {
+            nearValsIdxs = new ArrayList<>();
+            nearVals = new ArrayList<>();
+        }
+
+        addNearTtl(keyIdx, ttl, expireTime);
+
+        nearValsIdxs.add(keyIdx);
+        nearVals.add(val);
+    }
+
+    /**
+     * @param keyIdx Key index.
+     * @param ttl TTL for near cache update.
+     * @param expireTime Expire time for near cache update.
+     */
+    @Override @SuppressWarnings("ForLoopReplaceableByForEach")
+    public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+        if (ttl >= 0) {
+            if (nearTtls == null) {
+                nearTtls = new GridLongList(16);
+
+                for (int i = 0; i < keyIdx; i++)
+                    nearTtls.add(-1L);
+            }
+        }
+
+        if (nearTtls != null)
+            nearTtls.add(ttl);
+
+        if (expireTime >= 0) {
+            if (nearExpireTimes == null) {
+                nearExpireTimes = new GridLongList(16);
+
+                for (int i = 0; i < keyIdx; i++)
+                    nearExpireTimes.add(-1);
+            }
+        }
+
+        if (nearExpireTimes != null)
+            nearExpireTimes.add(expireTime);
+    }
+
+    /**
+     * @param idx Index.
+     * @return Expire time for near cache update.
+     */
+    @Override public long nearExpireTime(int idx) {
+        if (nearExpireTimes != null) {
+            assert idx >= 0 && idx < nearExpireTimes.size();
+
+            return nearExpireTimes.get(idx);
+        }
+
+        return -1L;
+    }
+
+    /**
+     * @param idx Index.
+     * @return TTL for near cache update.
+     */
+    @Override public long nearTtl(int idx) {
+        if (nearTtls != null) {
+            assert idx >= 0 && idx < nearTtls.size();
+
+            return nearTtls.get(idx);
+        }
+
+        return -1L;
+    }
+
+    /**
+     * @param nearVer Version generated on primary node to be used for originating node's near cache update.
+     */
+    @Override public void nearVersion(GridCacheVersion nearVer) {
+        this.nearVer = nearVer;
+    }
+
+    /**
+     * @return Version generated on primary node to be used for originating node's near cache update.
+     */
+    @Override public GridCacheVersion nearVersion() {
+        return nearVer;
+    }
+
+    /**
+     * @param keyIdx Index of key for which update was skipped
+     */
+    @Override public void addSkippedIndex(int keyIdx) {
+        if (nearSkipIdxs == null)
+            nearSkipIdxs = new ArrayList<>();
+
+        nearSkipIdxs.add(keyIdx);
+
+        addNearTtl(keyIdx, -1L, -1L);
+    }
+
+    /**
+     * @return Indexes of keys for which update was skipped
+     */
+    @Override @Nullable public List<Integer> skippedIndexes() {
+        return nearSkipIdxs;
+    }
+
+    /**
+     * @return Indexes of keys for which values were generated on primary node.
+     */
+   @Override @Nullable public List<Integer> nearValuesIndexes() {
+        return nearValsIdxs;
+   }
+
+    /**
+     * @param idx Index.
+     * @return Value generated on primary node which should be put to originating node's near cache.
+     */
+    @Override @Nullable public CacheObject nearValue(int idx) {
+        return nearVals.get(idx);
+    }
+
+    /**
+     * Adds key to collection of failed keys.
+     *
+     * @param key Key to add.
+     * @param e Error cause.
+     */
+    @Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
+        if (failedKeys == null)
+            failedKeys = new ConcurrentLinkedQueue<>();
+
+        failedKeys.add(key);
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
+    /**
+     * Adds keys to collection of failed keys.
+     *
+     * @param keys Key to add.
+     * @param e Error cause.
+     */
+    @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+        if (keys != null) {
+            if (failedKeys == null)
+                failedKeys = new ArrayList<>(keys.size());
+
+            failedKeys.addAll(keys);
+        }
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
+    /**
+     * Adds keys to collection of failed keys.
+     *
+     * @param keys Key to add.
+     * @param e Error cause.
+     * @param ctx Context.
+     */
+    @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) {
+        if (failedKeys == null)
+            failedKeys = new ArrayList<>(keys.size());
+
+        failedKeys.addAll(keys);
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (err != null && errBytes == null)
+            errBytes = ctx.marshaller().marshal(err);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalCacheObjects(failedKeys, cctx);
+
+        prepareMarshalCacheObjects(remapKeys, cctx);
+
+        prepareMarshalCacheObjects(nearVals, cctx);
+
+        if (ret != null)
+            ret.prepareMarshal(cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (errBytes != null && err == null)
+            err = ctx.marshaller().unmarshal(errBytes, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+        finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+
+        finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+
+        if (ret != null)
+            ret.finishUnmarshal(cctx, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("futVer", futVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeMessage("nearTtls", nearTtls))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("nearVer", nearVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeMessage("ret", ret))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                futVer = reader.readMessage("futVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                nearExpireTimes = reader.readMessage("nearExpireTimes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                nearTtls = reader.readMessage("nearTtls");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                nearVer = reader.readMessage("nearVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                ret = reader.readMessage("ret");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicMultipleUpdateResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 41;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 14;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicMultipleUpdateResponse.class, this, "parent");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 1e981af..8714010 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -149,7 +149,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
 
     /** */
     @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
+    private GridNearAtomicMultipleUpdateResponse res;
 
     /**
      * Empty constructor required by {@link Externalizable}.
@@ -427,7 +427,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+    @Override public boolean onResponse(GridNearAtomicMultipleUpdateResponse res) {
         if (this.res == null) {
             this.res = res;
 
@@ -438,7 +438,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable public GridNearAtomicUpdateResponse response() {
+    @Override @Nullable public GridNearAtomicMultipleUpdateResponse response() {
         return res;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
new file mode 100644
index 0000000..581c33b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponse {
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** Node ID this reply should be sent to. */
+    @GridDirectTransient
+    private UUID nodeId;
+
+    /** Future version. */
+    private GridCacheVersion futVer;
+
+    /** Update error. */
+    @GridDirectTransient
+    private volatile IgniteCheckedException err;
+
+    /** Serialized error. */
+    private byte[] errBytes;
+
+    /** Return value. */
+    @GridToStringInclude
+    private GridCacheReturn ret;
+
+    /** Failed keys. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private volatile Collection<KeyCacheObject> failedKeys;
+
+    /** Keys that should be remapped. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> remapKeys;
+
+    /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
+    @GridDirectCollection(int.class)
+    private List<Integer> nearValsIdxs;
+
+    /** Indexes of keys for which update was skipped (used if originating node has near cache). */
+    @GridDirectCollection(int.class)
+    private List<Integer> nearSkipIdxs;
+
+    /** Values generated on primary node which should be put to originating node's near cache. */
+    @GridToStringInclude
+    @GridDirectCollection(CacheObject.class)
+    private List<CacheObject> nearVals;
+
+    /** Version generated on primary node to be used for originating node's near cache update. */
+    private GridCacheVersion nearVer;
+
+    /** Near TTLs. */
+    private GridLongList nearTtls;
+
+    /** Near expire times. */
+    private GridLongList nearExpireTimes;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridNearAtomicSingleUpdateResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID this reply should be sent to.
+     * @param futVer Future version.
+     * @param addDepInfo Deployment info flag.
+     */
+    public GridNearAtomicSingleUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
+        assert futVer != null;
+
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+        this.futVer = futVer;
+        this.addDepInfo = addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /**
+     * @return Node ID this response should be sent to.
+     */
+    @Override public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     */
+    @Override public void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Future version.
+     */
+    @Override public GridCacheVersion futureVersion() {
+        return futVer;
+    }
+
+    /**
+     * Sets update error.
+     *
+     * @param err Error.
+     */
+    @Override public void error(IgniteCheckedException err) {
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException error() {
+        return err;
+    }
+
+    /**
+     * @return Collection of failed keys.
+     */
+    @Override public Collection<KeyCacheObject> failedKeys() {
+        return failedKeys;
+    }
+
+    /**
+     * @return Return value.
+     */
+    @Override public GridCacheReturn returnValue() {
+        return ret;
+    }
+
+    /**
+     * @param ret Return value.
+     */
+    @Override @SuppressWarnings("unchecked")
+    public void returnValue(GridCacheReturn ret) {
+        this.ret = ret;
+    }
+
+    /**
+     * @param remapKeys Remap keys.
+     */
+    @Override public void remapKeys(List<KeyCacheObject> remapKeys) {
+        this.remapKeys = remapKeys;
+    }
+
+    /**
+     * @return Remap keys.
+     */
+    @Override public Collection<KeyCacheObject> remapKeys() {
+        return remapKeys;
+    }
+
+    /**
+     * Adds value to be put in near cache on originating node.
+     *
+     * @param keyIdx Key index.
+     * @param val Value.
+     * @param ttl TTL for near cache update.
+     * @param expireTime Expire time for near cache update.
+     */
+    @Override public void addNearValue(int keyIdx,
+        @Nullable CacheObject val,
+        long ttl,
+        long expireTime) {
+        if (nearValsIdxs == null) {
+            nearValsIdxs = new ArrayList<>();
+            nearVals = new ArrayList<>();
+        }
+
+        addNearTtl(keyIdx, ttl, expireTime);
+
+        nearValsIdxs.add(keyIdx);
+        nearVals.add(val);
+    }
+
+    /**
+     * @param keyIdx Key index.
+     * @param ttl TTL for near cache update.
+     * @param expireTime Expire time for near cache update.
+     */
+    @Override @SuppressWarnings("ForLoopReplaceableByForEach")
+    public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+        if (ttl >= 0) {
+            if (nearTtls == null) {
+                nearTtls = new GridLongList(16);
+
+                for (int i = 0; i < keyIdx; i++)
+                    nearTtls.add(-1L);
+            }
+        }
+
+        if (nearTtls != null)
+            nearTtls.add(ttl);
+
+        if (expireTime >= 0) {
+            if (nearExpireTimes == null) {
+                nearExpireTimes = new GridLongList(16);
+
+                for (int i = 0; i < keyIdx; i++)
+                    nearExpireTimes.add(-1);
+            }
+        }
+
+        if (nearExpireTimes != null)
+            nearExpireTimes.add(expireTime);
+    }
+
+    /**
+     * @param idx Index.
+     * @return Expire time for near cache update.
+     */
+    @Override public long nearExpireTime(int idx) {
+        if (nearExpireTimes != null) {
+            assert idx >= 0 && idx < nearExpireTimes.size();
+
+            return nearExpireTimes.get(idx);
+        }
+
+        return -1L;
+    }
+
+    /**
+     * @param idx Index.
+     * @return TTL for near cache update.
+     */
+    @Override public long nearTtl(int idx) {
+        if (nearTtls != null) {
+            assert idx >= 0 && idx < nearTtls.size();
+
+            return nearTtls.get(idx);
+        }
+
+        return -1L;
+    }
+
+    /**
+     * @param nearVer Version generated on primary node to be used for originating node's near cache update.
+     */
+    @Override public void nearVersion(GridCacheVersion nearVer) {
+        this.nearVer = nearVer;
+    }
+
+    /**
+     * @return Version generated on primary node to be used for originating node's near cache update.
+     */
+    @Override public GridCacheVersion nearVersion() {
+        return nearVer;
+    }
+
+    /**
+     * @param keyIdx Index of key for which update was skipped
+     */
+    @Override public void addSkippedIndex(int keyIdx) {
+        if (nearSkipIdxs == null)
+            nearSkipIdxs = new ArrayList<>();
+
+        nearSkipIdxs.add(keyIdx);
+
+        addNearTtl(keyIdx, -1L, -1L);
+    }
+
+    /**
+     * @return Indexes of keys for which update was skipped
+     */
+    @Override @Nullable public List<Integer> skippedIndexes() {
+        return nearSkipIdxs;
+    }
+
+    /**
+     * @return Indexes of keys for which values were generated on primary node.
+     */
+    @Override @Nullable public List<Integer> nearValuesIndexes() {
+        return nearValsIdxs;
+    }
+
+    /**
+     * @param idx Index.
+     * @return Value generated on primary node which should be put to originating node's near cache.
+     */
+    @Override @Nullable public CacheObject nearValue(int idx) {
+        return nearVals.get(idx);
+    }
+
+    /**
+     * Adds key to collection of failed keys.
+     *
+     * @param key Key to add.
+     * @param e Error cause.
+     */
+    @Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
+        if (failedKeys == null)
+            failedKeys = new ConcurrentLinkedQueue<>();
+
+        failedKeys.add(key);
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
+    /**
+     * Adds keys to collection of failed keys.
+     *
+     * @param keys Key to add.
+     * @param e Error cause.
+     */
+    @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+        if (keys != null) {
+            if (failedKeys == null)
+                failedKeys = new ArrayList<>(keys.size());
+
+            failedKeys.addAll(keys);
+        }
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
+    /**
+     * Adds keys to collection of failed keys.
+     *
+     * @param keys Key to add.
+     * @param e Error cause.
+     * @param ctx Context.
+     */
+    @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e,
+        GridCacheContext ctx) {
+        if (failedKeys == null)
+            failedKeys = new ArrayList<>(keys.size());
+
+        failedKeys.addAll(keys);
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+        err.addSuppressed(e);
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (err != null && errBytes == null)
+            errBytes = ctx.marshaller().marshal(err);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalCacheObjects(failedKeys, cctx);
+
+        prepareMarshalCacheObjects(remapKeys, cctx);
+
+        prepareMarshalCacheObjects(nearVals, cctx);
+
+        if (ret != null)
+            ret.prepareMarshal(cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (errBytes != null && err == null)
+            err = ctx.marshaller().unmarshal(errBytes, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+        finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+
+        finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+
+        if (ret != null)
+            ret.finishUnmarshal(cctx, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("futVer", futVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeMessage("nearTtls", nearTtls))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("nearVer", nearVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeMessage("ret", ret))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                futVer = reader.readMessage("futVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                nearExpireTimes = reader.readMessage("nearExpireTimes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                nearTtls = reader.readMessage("nearTtls");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                nearVer = reader.readMessage("nearVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                ret = reader.readMessage("ret");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicMultipleUpdateResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -24;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 14;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicSingleUpdateResponse.class, this, "parent");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index c1dab93..68ee67b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -339,7 +339,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
      * @param nodeId Node ID.
      * @param res Update response.
      */
-    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+    public void onResult(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
         state.onResult(nodeId, res, false);
     }
 
@@ -349,7 +349,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
      * @param req Update request.
      * @param res Update response.
      */
-    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicMultipleUpdateResponse res) {
         assert nearEnabled;
 
         if (res.remapKeys() != null || !req.hasPrimary())
@@ -454,9 +454,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
     private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse>() {
                     @Override public void apply(GridNearAtomicUpdateRequest req,
-                        GridNearAtomicUpdateResponse res) {
+                        GridNearAtomicMultipleUpdateResponse res) {
                         onResult(res.nodeId(), res);
                     }
                 });
@@ -510,8 +510,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
 
         if (locUpdate != null) {
             cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicMultipleUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicMultipleUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                new CI2<GridNearAtomicMultipleUpdateRequest, GridNearAtomicMultipleUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicMultipleUpdateRequest req, GridNearAtomicMultipleUpdateResponse res) {
                         onResult(res.nodeId(), res);
                     }
                 });
@@ -570,7 +570,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
          * @param nodeId Left node ID.
          */
         void onNodeLeft(UUID nodeId) {
-            GridNearAtomicUpdateResponse res = null;
+            GridNearAtomicMultipleUpdateResponse res = null;
 
             synchronized (this) {
                 GridNearAtomicUpdateRequest req;
@@ -581,7 +581,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
                     req = mappings != null ? mappings.get(nodeId) : null;
 
                 if (req != null && req.response() == null) {
-                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                    res = new GridNearAtomicMultipleUpdateResponse(cctx.cacheId(),
                         nodeId,
                         req.futureVersion(),
                         cctx.deploymentEnabled());
@@ -605,7 +605,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
          * @param nodeErr {@code True} if response was created on node failure.
          */
         @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-        void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+        void onResult(UUID nodeId, GridNearAtomicMultipleUpdateResponse res, boolean nodeErr) {
             GridNearAtomicUpdateRequest req;
 
             AffinityTopologyVersion remapTopVer = null;
@@ -737,7 +737,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
             if (rcvAll && nearEnabled) {
                 if (mappings != null) {
                     for (GridNearAtomicMultipleUpdateRequest req0 : mappings.values()) {
-                        GridNearAtomicUpdateResponse res0 = req0.response();
+                        GridNearAtomicMultipleUpdateResponse res0 = req0.response();
 
                         assert res0 != null : req0;
 
@@ -812,7 +812,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
          */
         void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
             synchronized (this) {
-                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(cctx.cacheId(),
                     req.nodeId(),
                     req.futureVersion(),
                     cctx.deploymentEnabled());

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 960add7..c1977bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -190,10 +190,10 @@ public interface GridNearAtomicUpdateRequest {
      * @param res Response.
      * @return {@code True} if current response was {@code null}.
      */
-    public boolean onResponse(GridNearAtomicUpdateResponse res);
+    public boolean onResponse(GridNearAtomicMultipleUpdateResponse res);
 
     /**
      * @return Response.
      */
-    @Nullable public GridNearAtomicUpdateResponse response();
+    @Nullable public GridNearAtomicMultipleUpdateResponse response();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 3e3ac29..51d388c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -17,624 +17,86 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * DHT atomic cache near update response.
- */
-public class GridNearAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Cache message index. */
-    public static final int CACHE_MSG_IDX = nextIndexId();
-
-    /** Node ID this reply should be sent to. */
-    @GridDirectTransient
-    private UUID nodeId;
-
-    /** Future version. */
-    private GridCacheVersion futVer;
-
-    /** Update error. */
-    @GridDirectTransient
-    private volatile IgniteCheckedException err;
-
-    /** Serialized error. */
-    private byte[] errBytes;
-
-    /** Return value. */
-    @GridToStringInclude
-    private GridCacheReturn ret;
-
-    /** Failed keys. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private volatile Collection<KeyCacheObject> failedKeys;
-
-    /** Keys that should be remapped. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> remapKeys;
-
-    /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
-    @GridDirectCollection(int.class)
-    private List<Integer> nearValsIdxs;
-
-    /** Indexes of keys for which update was skipped (used if originating node has near cache). */
-    @GridDirectCollection(int.class)
-    private List<Integer> nearSkipIdxs;
-
-    /** Values generated on primary node which should be put to originating node's near cache. */
-    @GridToStringInclude
-    @GridDirectCollection(CacheObject.class)
-    private List<CacheObject> nearVals;
-
-    /** Version generated on primary node to be used for originating node's near cache update. */
-    private GridCacheVersion nearVer;
-
-    /** Near TTLs. */
-    private GridLongList nearTtls;
-
-    /** Near expire times. */
-    private GridLongList nearExpireTimes;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridNearAtomicUpdateResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     * @param nodeId Node ID this reply should be sent to.
-     * @param futVer Future version.
-     * @param addDepInfo Deployment info flag.
-     */
-    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
-        assert futVer != null;
-
-        this.cacheId = cacheId;
-        this.nodeId = nodeId;
-        this.futVer = futVer;
-        this.addDepInfo = addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /**
-     * @return Node ID this response should be sent to.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     */
-    public void nodeId(UUID nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    /**
-     * @return Future version.
-     */
-    public GridCacheVersion futureVersion() {
-        return futVer;
-    }
-
-    /**
-     * Sets update error.
-     *
-     * @param err Error.
-     */
-    public void error(IgniteCheckedException err){
-        this.err = err;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteCheckedException error() {
-        return err;
-    }
-
-    /**
-     * @return Collection of failed keys.
-     */
-    public Collection<KeyCacheObject> failedKeys() {
-        return failedKeys;
-    }
-
-    /**
-     * @return Return value.
-     */
-    public GridCacheReturn returnValue() {
-        return ret;
-    }
-
-    /**
-     * @param ret Return value.
-     */
-    @SuppressWarnings("unchecked")
-    public void returnValue(GridCacheReturn ret) {
-        this.ret = ret;
-    }
-
-    /**
-     * @param remapKeys Remap keys.
-     */
-    public void remapKeys(List<KeyCacheObject> remapKeys) {
-        this.remapKeys = remapKeys;
-    }
-
-    /**
-     * @return Remap keys.
-     */
-    public Collection<KeyCacheObject> remapKeys() {
-        return remapKeys;
-    }
-
-    /**
-     * Adds value to be put in near cache on originating node.
-     *
-     * @param keyIdx Key index.
-     * @param val Value.
-     * @param ttl TTL for near cache update.
-     * @param expireTime Expire time for near cache update.
-     */
-    public void addNearValue(int keyIdx,
-        @Nullable CacheObject val,
-        long ttl,
-        long expireTime) {
-        if (nearValsIdxs == null) {
-            nearValsIdxs = new ArrayList<>();
-            nearVals = new ArrayList<>();
-        }
-
-        addNearTtl(keyIdx, ttl, expireTime);
-
-        nearValsIdxs.add(keyIdx);
-        nearVals.add(val);
-    }
-
-    /**
-     * @param keyIdx Key index.
-     * @param ttl TTL for near cache update.
-     * @param expireTime Expire time for near cache update.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    public void addNearTtl(int keyIdx, long ttl, long expireTime) {
-        if (ttl >= 0) {
-            if (nearTtls == null) {
-                nearTtls = new GridLongList(16);
-
-                for (int i = 0; i < keyIdx; i++)
-                    nearTtls.add(-1L);
-            }
-        }
-
-        if (nearTtls != null)
-            nearTtls.add(ttl);
-
-        if (expireTime >= 0) {
-            if (nearExpireTimes == null) {
-                nearExpireTimes = new GridLongList(16);
-
-                for (int i = 0; i < keyIdx; i++)
-                    nearExpireTimes.add(-1);
-            }
-        }
-
-        if (nearExpireTimes != null)
-            nearExpireTimes.add(expireTime);
-    }
-
-    /**
-     * @param idx Index.
-     * @return Expire time for near cache update.
-     */
-    public long nearExpireTime(int idx) {
-        if (nearExpireTimes != null) {
-            assert idx >= 0 && idx < nearExpireTimes.size();
-
-            return nearExpireTimes.get(idx);
-        }
-
-        return -1L;
-    }
-
-    /**
-     * @param idx Index.
-     * @return TTL for near cache update.
-     */
-    public long nearTtl(int idx) {
-        if (nearTtls != null) {
-            assert idx >= 0 && idx < nearTtls.size();
-
-            return nearTtls.get(idx);
-        }
-
-        return -1L;
-    }
-
-    /**
-     * @param nearVer Version generated on primary node to be used for originating node's near cache update.
-     */
-    public void nearVersion(GridCacheVersion nearVer) {
-        this.nearVer = nearVer;
-    }
-
-    /**
-     * @return Version generated on primary node to be used for originating node's near cache update.
-     */
-    public GridCacheVersion nearVersion() {
-        return nearVer;
-    }
-
-    /**
-     * @param keyIdx Index of key for which update was skipped
-     */
-    public void addSkippedIndex(int keyIdx) {
-        if (nearSkipIdxs == null)
-            nearSkipIdxs = new ArrayList<>();
-
-        nearSkipIdxs.add(keyIdx);
-
-        addNearTtl(keyIdx, -1L, -1L);
-    }
-
-    /**
-     * @return Indexes of keys for which update was skipped
-     */
-    @Nullable public List<Integer> skippedIndexes() {
-        return nearSkipIdxs;
-    }
-
-    /**
-     * @return Indexes of keys for which values were generated on primary node.
-     */
-   @Nullable public List<Integer> nearValuesIndexes() {
-        return nearValsIdxs;
-   }
-
-    /**
-     * @param idx Index.
-     * @return Value generated on primary node which should be put to originating node's near cache.
-     */
-    @Nullable public CacheObject nearValue(int idx) {
-        return nearVals.get(idx);
-    }
-
-    /**
-     * Adds key to collection of failed keys.
-     *
-     * @param key Key to add.
-     * @param e Error cause.
-     */
-    public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
-        if (failedKeys == null)
-            failedKeys = new ConcurrentLinkedQueue<>();
-
-        failedKeys.add(key);
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
-    }
-
-    /**
-     * Adds keys to collection of failed keys.
-     *
-     * @param keys Key to add.
-     * @param e Error cause.
-     */
-    public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
-        if (keys != null) {
-            if (failedKeys == null)
-                failedKeys = new ArrayList<>(keys.size());
-
-            failedKeys.addAll(keys);
-        }
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
-    }
-
-    /**
-     * Adds keys to collection of failed keys.
-     *
-     * @param keys Key to add.
-     * @param e Error cause.
-     * @param ctx Context.
-     */
-    public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) {
-        if (failedKeys == null)
-            failedKeys = new ArrayList<>(keys.size());
-
-        failedKeys.addAll(keys);
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
-    }
-
-    /** {@inheritDoc}
-     * @param ctx*/
-    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (err != null && errBytes == null)
-            errBytes = ctx.marshaller().marshal(err);
-
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
-        prepareMarshalCacheObjects(failedKeys, cctx);
-
-        prepareMarshalCacheObjects(remapKeys, cctx);
-
-        prepareMarshalCacheObjects(nearVals, cctx);
-
-        if (ret != null)
-            ret.prepareMarshal(cctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (errBytes != null && err == null)
-            err = ctx.marshaller().unmarshal(errBytes, ldr);
-
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
-        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
-        finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
-
-        finishUnmarshalCacheObjects(nearVals, cctx, ldr);
-
-        if (ret != null)
-            ret.finishUnmarshal(cctx, ldr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeByteArray("errBytes", errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeMessage("futVer", futVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeMessage("nearTtls", nearTtls))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeMessage("nearVer", nearVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 13:
-                if (!writer.writeMessage("ret", ret))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                errBytes = reader.readByteArray("errBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
 
-            case 5:
-                futVer = reader.readMessage("futVer");
+public interface GridNearAtomicUpdateResponse {
 
-                if (!reader.isLastRead())
-                    return false;
+    int lookupIndex();
 
-                reader.incrementState();
+    UUID nodeId();
 
-            case 6:
-                nearExpireTimes = reader.readMessage("nearExpireTimes");
+    void nodeId(UUID nodeId);
 
-                if (!reader.isLastRead())
-                    return false;
+    GridCacheVersion futureVersion();
 
-                reader.incrementState();
+    void error(IgniteCheckedException err);
 
-            case 7:
-                nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+    IgniteCheckedException error();
 
-                if (!reader.isLastRead())
-                    return false;
+    Collection<KeyCacheObject> failedKeys();
 
-                reader.incrementState();
+    GridCacheReturn returnValue();
 
-            case 8:
-                nearTtls = reader.readMessage("nearTtls");
+    @SuppressWarnings("unchecked") void returnValue(GridCacheReturn ret);
 
-                if (!reader.isLastRead())
-                    return false;
+    void remapKeys(List<KeyCacheObject> remapKeys);
 
-                reader.incrementState();
+    Collection<KeyCacheObject> remapKeys();
 
-            case 9:
-                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+    void addNearValue(int keyIdx,
+        @Nullable CacheObject val,
+        long ttl,
+        long expireTime);
 
-                if (!reader.isLastRead())
-                    return false;
+    @SuppressWarnings("ForLoopReplaceableByForEach") void addNearTtl(int keyIdx, long ttl, long expireTime);
 
-                reader.incrementState();
+    long nearExpireTime(int idx);
 
-            case 10:
-                nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+    long nearTtl(int idx);
 
-                if (!reader.isLastRead())
-                    return false;
+    void nearVersion(GridCacheVersion nearVer);
 
-                reader.incrementState();
+    GridCacheVersion nearVersion();
 
-            case 11:
-                nearVer = reader.readMessage("nearVer");
+    void addSkippedIndex(int keyIdx);
 
-                if (!reader.isLastRead())
-                    return false;
+    @Nullable List<Integer> skippedIndexes();
 
-                reader.incrementState();
+    @Nullable List<Integer> nearValuesIndexes();
 
-            case 12:
-                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+    @Nullable CacheObject nearValue(int idx);
 
-                if (!reader.isLastRead())
-                    return false;
+    void addFailedKey(KeyCacheObject key, Throwable e);
 
-                reader.incrementState();
+    void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e);
 
-            case 13:
-                ret = reader.readMessage("ret");
+    void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx);
 
-                if (!reader.isLastRead())
-                    return false;
+    void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException;
 
-                reader.incrementState();
+    void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException;
 
-        }
+    boolean addDeploymentInfo();
 
-        return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
-    }
+    boolean writeTo(ByteBuffer buf, MessageWriter writer);
 
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 41;
-    }
+    boolean readFrom(ByteBuffer buf, MessageReader reader);
 
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 14;
-    }
+    byte directType();
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridNearAtomicUpdateResponse.class, this, "parent");
-    }
+    byte fieldsCount();
 }


Mime
View raw message