ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [19/26] ignite git commit: Implemented single update request.
Date Wed, 20 Apr 2016 07:12:24 GMT
Implemented single update request.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c560110
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c560110
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c560110

Branch: refs/heads/ignite-2523-1
Commit: 0c5601102a836177351e9428acdd3f0896fd0198
Parents: 5646ef0
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Apr 19 14:54:12 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Apr 19 14:54:12 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheIoManager.java    |  17 +
 .../dht/atomic/GridDhtAtomicCache.java          |  48 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  13 +-
 .../GridNearAtomicAbstractUpdateRequest.java    | 239 +++++
 .../GridNearAtomicSingleUpdateFuture.java       | 130 ++-
 .../GridNearAtomicSingleUpdateRequest.java      | 945 +++++++++++++++++++
 .../dht/atomic/GridNearAtomicUpdateRequest.java | 217 ++---
 .../distributed/near/GridNearAtomicCache.java   |   8 +-
 9 files changed, 1407 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 47b1c5f..e028cf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -726,6 +727,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case -23:
+                msg = new GridNearAtomicSingleUpdateRequest();
+
+                break;
+
             // [-3..119] [124] - this
             // [120..123] - DR
             // [-4..-22] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index aab1bcc..82b7604 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -603,6 +604,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case -23: {
+                GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
+
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    nodeId,
+                    req.futureVersion(),
+                    ctx.deploymentEnabled());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]", msg.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 013184b..cbda827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -140,7 +140,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /** Update reply closure. */
     @GridToStringExclude
-    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+    private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
 
     /** Pending  */
     private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -193,9 +193,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+        updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-            @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+            @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                     assert req.writeSynchronizationMode() != FULL_ASYNC : req;
 
@@ -257,6 +257,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicSingleUpdateRequest.class, new CI2<UUID, GridNearAtomicSingleUpdateRequest>() {
+            @Override public void apply(UUID nodeId, GridNearAtomicSingleUpdateRequest req) {
+                processNearAtomicUpdateRequest(nodeId, req);
+            }
+        });
+
         ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
             @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
                 processNearAtomicUpdateResponse(nodeId, res);
@@ -1447,8 +1453,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     public void updateAllAsyncInternal(
         final UUID nodeId,
-        final GridNearAtomicUpdateRequest req,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        final GridNearAtomicAbstractUpdateRequest req,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
 
@@ -1472,8 +1478,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     public void updateAllAsyncInternal0(
         UUID nodeId,
-        GridNearAtomicUpdateRequest req,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        GridNearAtomicAbstractUpdateRequest req,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
             ctx.deploymentEnabled());
@@ -1698,12 +1704,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private UpdateBatchResult updateWithBatch(
         final ClusterNode node,
         final boolean hasNear,
-        final GridNearAtomicUpdateRequest req,
+        final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         final List<GridDhtCacheEntry> locked,
         final GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         final boolean replicate,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
@@ -1712,7 +1718,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
         assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
 
-        if (!F.isEmpty(req.filter()) && ctx.loadPreviousValue()) {
+        if (req.hasFilter() && ctx.loadPreviousValue()) {
             try {
                 reloadIfNeeded(locked);
             }
@@ -1723,7 +1729,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
 
-        int size = req.keys().size();
+        int size = req.keysCount();
 
         Map<KeyCacheObject, CacheObject> putMap = null;
 
@@ -2117,12 +2123,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private UpdateSingleResult updateSingle(
         ClusterNode node,
         boolean hasNear,
-        GridNearAtomicUpdateRequest req,
+        GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2379,8 +2385,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable final Collection<KeyCacheObject> rmvKeys,
         @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
-        final GridNearAtomicUpdateRequest req,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         final boolean replicate,
         final UpdateBatchResult batchRes,
@@ -2784,7 +2790,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *      will return false.
      * @return {@code True} if filter evaluation succeeded.
      */
-    private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
+    private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res) {
         try {
             return ctx.isAllLocked(entry, req.filter());
@@ -2799,7 +2805,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * @param req Request to remap.
      */
-    private void remapToNewPrimary(GridNearAtomicUpdateRequest req) {
+    private void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) {
         assert req.writeSynchronizationMode() == FULL_ASYNC : req;
 
         if (log.isDebugEnabled())
@@ -2816,7 +2822,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             drRmvVals = null;
         }
         else if (req.operation() == UPDATE) {
-            int size = req.keys().size();
+            int size = req.keysCount();
 
             drPutVals = new ArrayList<>(size);
 
@@ -2878,9 +2884,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicUpdateRequest updateReq,
+        GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean force
     ) {
         if (!force) {
@@ -2911,7 +2917,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nodeId Sender node ID.
      * @param req Near atomic update request.
      */
-    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) {
+    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (log.isDebugEnabled())
             log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4721d6e..5dbf6c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -79,7 +79,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** Completion callback. */
     @GridToStringExclude
-    private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+    private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
 
     /** Mappings. */
     @GridToStringInclude
@@ -89,7 +89,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
     /** Update request. */
-    private final GridNearAtomicUpdateRequest updateReq;
+    private final GridNearAtomicAbstractUpdateRequest updateReq;
 
     /** Update response. */
     private final GridNearAtomicUpdateResponse updateRes;
@@ -115,10 +115,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      */
     public GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicUpdateRequest,
-        GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         GridCacheVersion writeVer,
-        GridNearAtomicUpdateRequest updateReq,
+        GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes
     ) {
         this.cctx = cctx;
@@ -132,8 +131,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
 
-        keys = new ArrayList<>(updateReq.keys().size());
-        mappings = U.newHashMap(updateReq.keys().size());
+        keys = new ArrayList<>(updateReq.keysCount());
+        mappings = U.newHashMap(updateReq.keysCount());
 
         waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
new file mode 100644
index 0000000..7bceb9b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+
+/**
+ * Abstract near update request.
+ */
+public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /**
+     * @return Mapped node ID.
+     */
+    public abstract UUID nodeId();
+
+    /**
+     * @param nodeId Node ID.
+     */
+    public abstract void nodeId(UUID nodeId);
+
+    /**
+     * @return Subject ID.
+     */
+    public abstract UUID subjectId();
+
+    /**
+     * @return Task name hash.
+     */
+    public abstract int taskNameHash();
+
+    /**
+     * @return Future version.
+     */
+    public abstract GridCacheVersion futureVersion();
+
+    /**
+     * @return Flag indicating whether this is fast-map udpate.
+     */
+    public abstract boolean fastMap();
+
+    /**
+     * @return Update version for fast-map request.
+     */
+    public abstract GridCacheVersion updateVersion();
+
+    /**
+     * @return Topology locked flag.
+     */
+    public abstract boolean topologyLocked();
+
+    /**
+     * @return {@code True} if request sent from client node.
+     */
+    public abstract boolean clientRequest();
+
+    /**
+     * @return Cache write synchronization mode.
+     */
+    public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+
+    /**
+     * @return Expiry policy.
+     */
+    public abstract ExpiryPolicy expiry();
+
+    /**
+     * @return Return value flag.
+     */
+    public abstract boolean returnValue();
+
+    /**
+     * @return Filter.
+     */
+    // TODO
+    @Nullable public abstract CacheEntryPredicate[] filter();
+
+    /**
+     * Whether filter exist.
+     *
+     * @return {@code True} if exist.
+     */
+    public abstract boolean hasFilter();
+
+    /**
+     * @return Skip write-through to a persistent storage.
+     */
+    public abstract boolean skipStore();
+
+    /**
+     * @return Keep binary flag.
+     */
+    public abstract boolean keepBinary();
+
+    /**
+     * @return Keys for this update request.
+     */
+    // TODO
+    public abstract List<KeyCacheObject> keys();
+
+    /**
+     * @return Key number.
+     */
+    public abstract int keysCount();
+
+    /**
+     * @return Values for this update request.
+     */
+    // TODO
+    public abstract List<?> values();
+
+    /**
+     * @return Update operation.
+     */
+    public abstract GridCacheOperation operation();
+
+    /**
+     * @return Optional arguments for entry processor.
+     */
+    @Nullable public abstract Object[] invokeArguments();
+
+    /**
+     * @param idx Key index.
+     * @return Value.
+     */
+    // TODO
+    @SuppressWarnings("unchecked")
+    public abstract CacheObject value(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Entry processor.
+     */
+    @SuppressWarnings("unchecked")
+    // TODO
+    public abstract EntryProcessor<Object, Object, Object> entryProcessor(int idx);
+
+    /**
+     * @param idx Index to get.
+     * @return Write value - either value, or transform closure.
+     */
+    // TODO
+    public abstract CacheObject writeValue(int idx);
+
+    /**
+     * @return Conflict versions.
+     */
+    // TODO
+    @Nullable public abstract List<GridCacheVersion> conflictVersions();
+
+    /**
+     * @param idx Index.
+     * @return Conflict version.
+     */
+    // TODO
+    @Nullable public abstract GridCacheVersion conflictVersion(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Conflict TTL.
+     */
+    // TODO
+    public abstract long conflictTtl(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Conflict expire time.
+     */
+    // TODO
+    public abstract long conflictExpireTime(int idx);
+
+    /**
+     * @return Flag indicating whether this request contains primary keys.
+     */
+    public abstract boolean hasPrimary();
+
+    /**
+     * @param res Response.
+     * @return {@code True} if current response was {@code null}.
+     */
+    // TODO
+    public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+
+    /**
+     * @return Response.
+     */
+    // TODO
+    @Nullable public abstract GridNearAtomicUpdateResponse response();
+
+    /**
+     * Cleanup values.
+     *
+     * @param clearKeys If {@code true} clears keys.
+     */
+    public abstract void cleanup(boolean clearKeys);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 7a95640..266ff21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -48,6 +48,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -67,7 +68,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private Object val;
 
     /** Not null is operation is mapped to single node. */
-    private GridNearAtomicUpdateRequest req;
+    private GridNearAtomicAbstractUpdateRequest req;
 
     /**
      * @param cctx Cache context.
@@ -128,7 +129,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridNearAtomicUpdateResponse res = null;
 
         synchronized (mux) {
-            GridNearAtomicUpdateRequest req = this.req.nodeId().equals(nodeId) ? this.req : null;
+            GridNearAtomicAbstractUpdateRequest req = this.req.nodeId().equals(nodeId) ? this.req : null;
 
             if (req != null && req.response() == null) {
                 res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
@@ -191,7 +192,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
     public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
-        GridNearAtomicUpdateRequest req;
+        GridNearAtomicAbstractUpdateRequest req;
 
         AffinityTopologyVersion remapTopVer = null;
 
@@ -367,7 +368,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param req Update request.
      * @param res Update response.
      */
-    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+    private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
         if (res.remapKeys() != null || !req.hasPrimary())
@@ -438,11 +439,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param nodeId Node ID.
      * @param req Request.
      */
-    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+    private void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
                         onResult(res.nodeId(), res, false);
                     }
                 });
@@ -467,7 +468,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param req Request.
      * @param e Error.
      */
-    void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+    void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
         synchronized (mux) {
             GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                 req.nodeId(),
@@ -492,7 +493,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         }
 
         Exception err = null;
-        GridNearAtomicUpdateRequest singleReq0 = null;
+        GridNearAtomicAbstractUpdateRequest singleReq0 = null;
 
         GridCacheVersion futVer = cctx.versions().next(topVer);
 
@@ -581,7 +582,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return Request.
      * @throws Exception If failed.
      */
-    private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+    private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
         GridCacheVersion futVer,
         @Nullable GridCacheVersion updVer) throws Exception {
         if (key == null)
@@ -597,40 +598,89 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         if (op != TRANSFORM)
             val = cctx.toCacheObject(val);
 
-        ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+        List<ClusterNode> affNodes = cctx.affinity().nodes(cacheKey, topVer);
 
-        if (primary == null)
+        if (F.isEmpty(affNodes))
             throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                 "left the grid).");
 
-        GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
-            cctx.cacheId(),
-            primary.id(),
-            futVer,
-            false,
-            updVer,
-            topVer,
-            topLocked,
-            syncMode,
-            op,
-            retval,
-            expiryPlc,
-            invokeArgs,
-            filter,
-            subjId,
-            taskNameHash,
-            skipStore,
-            keepBinary,
-            cctx.kernalContext().clientNode(),
-            cctx.deploymentEnabled(),
-            1);
-
-        req.addUpdateEntry(cacheKey,
-            val,
-            CU.TTL_NOT_CHANGED,
-            CU.EXPIRE_TIME_CALCULATE,
-            null,
-            true);
+        // TODO: Can we change it to affNodes.get(0)?
+        ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+
+        assert primary != null;
+
+        boolean single = true;
+
+        // TODO: Instead, we could implement a method on affinity to check if all nodes are greater than particular ver.
+        for (ClusterNode affNode : affNodes) {
+            if (!affNode.version().greaterThanEqual(1, 1, 6)) {
+                single = false;
+
+                break;
+            }
+        }
+
+        GridNearAtomicAbstractUpdateRequest req;
+
+        if (single) {
+            GridNearAtomicSingleUpdateRequest req0 = new GridNearAtomicSingleUpdateRequest(
+                cctx.cacheId(),
+                primary.id(),
+                futVer,
+                false,
+                updVer,
+                topVer,
+                topLocked,
+                syncMode,
+                op,
+                retval,
+                expiryPlc,
+                invokeArgs,
+                filter,
+                subjId,
+                taskNameHash,
+                skipStore,
+                keepBinary,
+                cctx.kernalContext().clientNode(),
+                cctx.deploymentEnabled(),
+                1);
+
+            req0.addUpdateEntry(cacheKey, val, true);
+
+            req = req0;
+        }
+        else {
+            GridNearAtomicUpdateRequest req0 = new GridNearAtomicUpdateRequest(
+                cctx.cacheId(),
+                primary.id(),
+                futVer,
+                false,
+                updVer,
+                topVer,
+                topLocked,
+                syncMode,
+                op,
+                retval,
+                expiryPlc,
+                invokeArgs,
+                filter,
+                subjId,
+                taskNameHash,
+                skipStore,
+                keepBinary,
+                cctx.kernalContext().clientNode(),
+                cctx.deploymentEnabled(),
+                1);
+
+            req0.addUpdateEntry(cacheKey,
+                val,
+                CU.TTL_NOT_CHANGED,
+                CU.EXPIRE_TIME_CALCULATE,
+                null,
+                true);
+
+            req = req0;
+        }
 
         return req;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
new file mode 100644
index 0000000..741a4d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -0,0 +1,945 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+
+/**
+ * Atomic near single update request.
+ */
+public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpdateRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Target node ID. */
+    @GridDirectTransient
+    private UUID nodeId;
+
+    /** Future version. */
+    private GridCacheVersion futVer;
+
+    /** Fast map flag. */
+    private boolean fastMap;
+
+    /** Update version. Set to non-null if fastMap is {@code true}. */
+    private GridCacheVersion updateVer;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+    private boolean topLocked;
+
+    /** Write synchronization mode. */
+    private CacheWriteSynchronizationMode syncMode;
+
+    /** Update operation. */
+    private GridCacheOperation op;
+
+    /** Keys to update. */
+    @GridToStringInclude
+    @GridDirectCollection(KeyCacheObject.class)
+    private List<KeyCacheObject> keys;
+
+    /** Values to update. */
+    @GridDirectCollection(CacheObject.class)
+    private List<CacheObject> vals;
+
+    /** Entry processors. */
+    @GridDirectTransient
+    private List<EntryProcessor<Object, Object, Object>> entryProcessors;
+
+    /** Entry processors bytes. */
+    @GridDirectCollection(byte[].class)
+    private List<byte[]> entryProcessorsBytes;
+
+    /** Optional arguments for entry processor. */
+    @GridDirectTransient
+    private Object[] invokeArgs;
+
+    /** Entry processor arguments bytes. */
+    private byte[][] invokeArgsBytes;
+
+    /** Conflict versions. */
+    @GridDirectCollection(GridCacheVersion.class)
+    private List<GridCacheVersion> conflictVers;
+
+    /** Conflict TTLs. */
+    private GridLongList conflictTtls;
+
+    /** Conflict expire times. */
+    private GridLongList conflictExpireTimes;
+
+    /** Return value flag. */
+    private boolean retval;
+
+    /** Expiry policy. */
+    @GridDirectTransient
+    private ExpiryPolicy expiryPlc;
+
+    /** Expiry policy bytes. */
+    private byte[] expiryPlcBytes;
+
+    /** Filter. */
+    private CacheEntryPredicate[] filter;
+
+    /** Flag indicating whether request contains primary keys. */
+    private boolean hasPrimary;
+
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Task name hash. */
+    private int taskNameHash;
+
+    /** Skip write-through to a persistent storage. */
+    private boolean skipStore;
+
+    /** */
+    private boolean clientReq;
+
+    /** Keep binary flag. */
+    private boolean keepBinary;
+
+    /** */
+    @GridDirectTransient
+    private GridNearAtomicUpdateResponse res;
+
+    /** Maximum possible size of inner collections. */
+    @GridDirectTransient
+    private int initSize;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridNearAtomicSingleUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futVer Future version.
+     * @param fastMap Fast map scheme flag.
+     * @param updateVer Update version set if fast map is performed.
+     * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
+     * @param syncMode Synchronization mode.
+     * @param op Cache update operation.
+     * @param retval Return value required flag.
+     * @param expiryPlc Expiry policy.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @param filter Optional filter for atomic check.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
+     * @param clientReq Client node request flag.
+     * @param addDepInfo Deployment info flag.
+     * @param maxEntryCnt Maximum entries count.
+     */
+    public GridNearAtomicSingleUpdateRequest(
+        int cacheId,
+        UUID nodeId,
+        GridCacheVersion futVer,
+        boolean fastMap,
+        @Nullable GridCacheVersion updateVer,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        @Nullable ExpiryPolicy expiryPlc,
+        @Nullable Object[] invokeArgs,
+        @Nullable CacheEntryPredicate[] filter,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean clientReq,
+        boolean addDepInfo,
+        int maxEntryCnt
+    ) {
+        assert futVer != null;
+
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+        this.futVer = futVer;
+        this.fastMap = fastMap;
+        this.updateVer = updateVer;
+
+        this.topVer = topVer;
+        this.topLocked = topLocked;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.retval = retval;
+        this.expiryPlc = expiryPlc;
+        this.invokeArgs = invokeArgs;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.keepBinary = keepBinary;
+        this.clientReq = clientReq;
+        this.addDepInfo = addDepInfo;
+
+        // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
+        // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
+        // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
+        // than 10, we use it.
+        initSize = Math.min(maxEntryCnt, 10);
+
+        keys = new ArrayList<>(initSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID nodeId() {
+        return nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID subjectId() {
+        return subjId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion futureVersion() {
+        return futVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean fastMap() {
+        return fastMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion updateVersion() {
+        return updateVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean topologyLocked() {
+        return topLocked;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean clientRequest() {
+        return clientReq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExpiryPolicy expiry() {
+        return expiryPlc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean returnValue() {
+        return retval;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheEntryPredicate[] filter() {
+        return filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasFilter() {
+        return !F.isEmpty(filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
+        return keepBinary;
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Optional update value.
+     * @param primary If given key is primary on this mapping.
+     */
+    @SuppressWarnings("unchecked")
+    public void addUpdateEntry(KeyCacheObject key, @Nullable Object val, boolean primary) {
+        EntryProcessor<Object, Object, Object> entryProcessor = null;
+
+        if (op == TRANSFORM) {
+            assert val instanceof EntryProcessor : val;
+
+            entryProcessor = (EntryProcessor<Object, Object, Object>) val;
+        }
+
+        assert val != null || op == DELETE;
+
+        keys.add(key);
+
+        if (entryProcessor != null) {
+            if (entryProcessors == null)
+                entryProcessors = new ArrayList<>(initSize);
+
+            entryProcessors.add(entryProcessor);
+        }
+        else if (val != null) {
+            assert val instanceof CacheObject : val;
+
+            if (vals == null)
+                vals = new ArrayList<>(initSize);
+
+            vals.add((CacheObject)val);
+        }
+
+        hasPrimary |= primary;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<KeyCacheObject> keys() {
+        return keys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int keysCount() {
+        return keys.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<?> values() {
+        return op == TRANSFORM ? entryProcessors : vals;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheOperation operation() {
+        return op;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Object[] invokeArguments() {
+        return invokeArgs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject value(int idx) {
+        assert op == UPDATE : op;
+
+        return vals.get(idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+        assert op == TRANSFORM : op;
+
+        return entryProcessors.get(idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject writeValue(int idx) {
+        if (vals != null)
+            return vals.get(idx);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public List<GridCacheVersion> conflictVersions() {
+        return conflictVers;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+        if (conflictVers != null) {
+            assert idx >= 0 && idx < conflictVers.size();
+
+            return conflictVers.get(idx);
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long conflictTtl(int idx) {
+        if (conflictTtls != null) {
+            assert idx >= 0 && idx < conflictTtls.size();
+
+            return conflictTtls.get(idx);
+        }
+
+        return CU.TTL_NOT_CHANGED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long conflictExpireTime(int idx) {
+        if (conflictExpireTimes != null) {
+            assert idx >= 0 && idx < conflictExpireTimes.size();
+
+            return conflictExpireTimes.get(idx);
+        }
+
+        return CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasPrimary() {
+        return hasPrimary;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+        if (this.res == null) {
+            this.res = res;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public GridNearAtomicUpdateResponse response() {
+        return res;
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalCacheObjects(keys, cctx);
+
+        if (filter != null) {
+            boolean hasFilter = false;
+
+            for (CacheEntryPredicate p : filter) {
+                if (p != null) {
+                    hasFilter = true;
+
+                    p.prepareMarshal(cctx);
+                }
+            }
+
+            if (!hasFilter)
+                filter = null;
+        }
+
+        if (expiryPlc != null && expiryPlcBytes == null)
+            expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+
+        if (op == TRANSFORM) {
+            // force addition of deployment info for entry processors if P2P is enabled globally.
+            if (!addDepInfo && ctx.deploymentEnabled())
+                addDepInfo = true;
+
+            if (entryProcessorsBytes == null)
+                entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+
+            if (invokeArgsBytes == null)
+                invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+        }
+        else
+            prepareMarshalCacheObjects(vals, cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        finishUnmarshalCacheObjects(keys, cctx, ldr);
+
+        if (op == TRANSFORM) {
+            if (entryProcessors == null)
+                entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+            if (invokeArgs == null)
+                invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+        }
+        else
+            finishUnmarshalCacheObjects(vals, cctx, ldr);
+
+        if (filter != null) {
+            for (CacheEntryPredicate p : filter) {
+                if (p != null)
+                    p.finishUnmarshal(cctx, ldr);
+            }
+        }
+
+        if (expiryPlcBytes != null && expiryPlc == null)
+            expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeBoolean("clientReq", clientReq))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("conflictTtls", conflictTtls))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeBoolean("fastMap", fastMap))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("futVer", futVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeBoolean("hasPrimary", hasPrimary))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
+                if (!writer.writeBoolean("keepBinary", keepBinary))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 16:
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 17:
+                if (!writer.writeBoolean("retval", retval))
+                    return false;
+
+                writer.incrementState();
+
+            case 18:
+                if (!writer.writeBoolean("skipStore", skipStore))
+                    return false;
+
+                writer.incrementState();
+
+            case 19:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 20:
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 21:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 22:
+                if (!writer.writeBoolean("topLocked", topLocked))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 25:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                clientReq = reader.readBoolean("clientReq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                conflictTtls = reader.readMessage("conflictTtls");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                fastMap = reader.readBoolean("fastMap");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                futVer = reader.readMessage("futVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                hasPrimary = reader.readBoolean("hasPrimary");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                keepBinary = reader.readBoolean("keepBinary");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 16:
+                byte opOrd;
+
+                opOrd = reader.readByte("op");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                op = GridCacheOperation.fromOrdinal(opOrd);
+
+                reader.incrementState();
+
+            case 17:
+                retval = reader.readBoolean("retval");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
+                skipStore = reader.readBoolean("skipStore");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 21:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 22:
+                topLocked = reader.readBoolean("topLocked");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 23:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 24:
+                updateVer = reader.readMessage("updateVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 25:
+                vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicUpdateRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanup(boolean clearKeys) {
+        vals = null;
+        entryProcessors = null;
+        entryProcessorsBytes = null;
+        invokeArgs = null;
+        invokeArgsBytes = null;
+
+        if (clearKeys)
+            keys = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -23;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 26;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicSingleUpdateRequest.class, this, "filter", Arrays.toString(filter),
+            "parent", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 375c02f..f061868 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -33,8 +33,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -42,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -58,13 +57,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD
 /**
  * Lite DHT cache update request sent from near node to primary node.
  */
-public class GridNearAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Message index. */
-    public static final int CACHE_MSG_IDX = nextIndexId();
-
     /** Target node ID. */
     @GridDirectTransient
     private UUID nodeId;
@@ -249,119 +245,87 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     }
 
     /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /**
-     * @return Mapped node ID.
-     */
-    public UUID nodeId() {
+    @Override public UUID nodeId() {
         return nodeId;
     }
 
-    /**
-     * @param nodeId Node ID.
-     */
-    public void nodeId(UUID nodeId) {
+    /** {@inheritDoc} */
+    @Override public void nodeId(UUID nodeId) {
         this.nodeId = nodeId;
     }
 
-    /**
-     * @return Subject ID.
-     */
-    public UUID subjectId() {
+    /** {@inheritDoc} */
+    @Override public UUID subjectId() {
         return subjId;
     }
 
-    /**
-     * @return Task name hash.
-     */
-    public int taskNameHash() {
+    /** {@inheritDoc} */
+    @Override public int taskNameHash() {
         return taskNameHash;
     }
 
-    /**
-     * @return Future version.
-     */
-    public GridCacheVersion futureVersion() {
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion futureVersion() {
         return futVer;
     }
 
-    /**
-     * @return Flag indicating whether this is fast-map udpate.
-     */
-    public boolean fastMap() {
+    /** {@inheritDoc} */
+    @Override public boolean fastMap() {
         return fastMap;
     }
 
-    /**
-     * @return Update version for fast-map request.
-     */
-    public GridCacheVersion updateVersion() {
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion updateVersion() {
         return updateVer;
     }
 
-    /**
-     * @return Topology version.
-     */
+    /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
-    /**
-     * @return Topology locked flag.
-     */
-    public boolean topologyLocked() {
+    /** {@inheritDoc} */
+    @Override public boolean topologyLocked() {
         return topLocked;
     }
 
-    /**
-     * @return {@code True} if request sent from client node.
-     */
-    public boolean clientRequest() {
+    /** {@inheritDoc} */
+    @Override public boolean clientRequest() {
         return clientReq;
     }
 
-    /**
-     * @return Cache write synchronization mode.
-     */
-    public CacheWriteSynchronizationMode writeSynchronizationMode() {
+    /** {@inheritDoc} */
+    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
         return syncMode;
     }
 
-    /**
-     * @return Expiry policy.
-     */
-    public ExpiryPolicy expiry() {
+    /** {@inheritDoc} */
+    @Override public ExpiryPolicy expiry() {
         return expiryPlc;
     }
 
-    /**
-     * @return Return value flag.
-     */
-    public boolean returnValue() {
+    /** {@inheritDoc} */
+    @Override public boolean returnValue() {
         return retval;
     }
 
-    /**
-     * @return Filter.
-     */
-    @Nullable public CacheEntryPredicate[] filter() {
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheEntryPredicate[] filter() {
         return filter;
     }
 
-    /**
-     * @return Skip write-through to a persistent storage.
-     */
-    public boolean skipStore() {
+    /** {@inheritDoc} */
+    @Override public boolean hasFilter() {
+        return !F.isEmpty(filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
         return skipStore;
     }
 
-    /**
-     * @return Keep binary flag.
-     */
-    public boolean keepBinary() {
+    /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
         return keepBinary;
     }
 
@@ -373,6 +337,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param conflictVer Conflict version (optional).
      * @param primary If given key is primary on this mapping.
      */
+    @SuppressWarnings("unchecked")
     public void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
@@ -445,79 +410,60 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         }
     }
 
-    /**
-     * @return Keys for this update request.
-     */
-    public List<KeyCacheObject> keys() {
+    /** {@inheritDoc} */
+    @Override public List<KeyCacheObject> keys() {
         return keys;
     }
 
-    /**
-     * @return Values for this update request.
-     */
-    public List<?> values() {
+    /** {@inheritDoc} */
+    @Override public int keysCount() {
+        return keys.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<?> values() {
         return op == TRANSFORM ? entryProcessors : vals;
     }
 
-    /**
-     * @return Update operation.
-     */
-    public GridCacheOperation operation() {
+    /** {@inheritDoc} */
+    @Override public GridCacheOperation operation() {
         return op;
     }
 
-    /**
-     * @return Optional arguments for entry processor.
-     */
-    @Nullable public Object[] invokeArguments() {
+    /** {@inheritDoc} */
+    @Override @Nullable public Object[] invokeArguments() {
         return invokeArgs;
     }
 
-    /**
-     * @param idx Key index.
-     * @return Value.
-     */
-    @SuppressWarnings("unchecked")
-    public CacheObject value(int idx) {
+    /** {@inheritDoc} */
+    @Override public CacheObject value(int idx) {
         assert op == UPDATE : op;
 
         return vals.get(idx);
     }
 
-    /**
-     * @param idx Key index.
-     * @return Entry processor.
-     */
-    @SuppressWarnings("unchecked")
-    public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+    /** {@inheritDoc} */
+    @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
         assert op == TRANSFORM : op;
 
         return entryProcessors.get(idx);
     }
 
-    /**
-     * @param idx Index to get.
-     * @return Write value - either value, or transform closure.
-     */
-    public CacheObject writeValue(int idx) {
+    /** {@inheritDoc} */
+    @Override public CacheObject writeValue(int idx) {
         if (vals != null)
             return vals.get(idx);
 
         return null;
     }
 
-    /**
-     * @return Conflict versions.
-     */
-    @Nullable public List<GridCacheVersion> conflictVersions() {
+    /** {@inheritDoc} */
+    @Override @Nullable public List<GridCacheVersion> conflictVersions() {
         return conflictVers;
     }
 
-    /**
-     * @param idx Index.
-     * @return Conflict version.
-     */
-    @Nullable public GridCacheVersion conflictVersion(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
         if (conflictVers != null) {
             assert idx >= 0 && idx < conflictVers.size();
 
@@ -527,11 +473,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         return null;
     }
 
-    /**
-     * @param idx Index.
-     * @return Conflict TTL.
-     */
-    public long conflictTtl(int idx) {
+    /** {@inheritDoc} */
+    @Override public long conflictTtl(int idx) {
         if (conflictTtls != null) {
             assert idx >= 0 && idx < conflictTtls.size();
 
@@ -541,11 +484,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         return CU.TTL_NOT_CHANGED;
     }
 
-    /**
-     * @param idx Index.
-     * @return Conflict expire time.
-     */
-    public long conflictExpireTime(int idx) {
+    /** {@inheritDoc} */
+    @Override public long conflictExpireTime(int idx) {
         if (conflictExpireTimes != null) {
             assert idx >= 0 && idx < conflictExpireTimes.size();
 
@@ -555,18 +495,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         return CU.EXPIRE_TIME_CALCULATE;
     }
 
-    /**
-     * @return Flag indicating whether this request contains primary keys.
-     */
-    public boolean hasPrimary() {
+    /** {@inheritDoc} */
+    @Override public boolean hasPrimary() {
         return hasPrimary;
     }
 
-    /**
-     * @param res Response.
-     * @return {@code True} if current response was {@code null}.
-     */
-    public boolean onResponse(GridNearAtomicUpdateResponse res) {
+    /** {@inheritDoc} */
+    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
         if (this.res == null) {
             this.res = res;
 
@@ -576,10 +511,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         return false;
     }
 
-    /**
-     * @return Response.
-     */
-    @Nullable public GridNearAtomicUpdateResponse response() {
+    /** {@inheritDoc} */
+    @Override @Nullable public GridNearAtomicUpdateResponse response() {
         return res;
     }
 
@@ -1025,12 +958,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         return reader.afterMessageRead(GridNearAtomicUpdateRequest.class);
     }
 
-    /**
-     * Cleanup values.
-     *
-     * @param clearKeys If {@code true} clears keys.
-     */
-    public void cleanup(boolean clearKeys) {
+    /** {@inheritDoc} */
+    @Override public void cleanup(boolean clearKeys) {
         vals = null;
         entryProcessors = null;
         entryProcessorsBytes = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a7481d3..35e554d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -45,7 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -127,10 +127,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
      * @param res Update response.
      */
     public void processNearAtomicUpdateResponse(
-        GridNearAtomicUpdateRequest req,
+        GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res
     ) {
-        if (F.size(res.failedKeys()) == req.keys().size())
+        if (F.size(res.failedKeys()) == req.keysCount())
             return;
 
         /*
@@ -153,7 +153,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        for (int i = 0; i < req.keys().size(); i++) {
+        for (int i = 0; i < req.keysCount(); i++) {
             if (F.contains(skipped, i))
                 continue;
 


Mime
View raw message