ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [13/15] ignite git commit: IGNITE-2523 "single put" NEAR update request
Date Mon, 21 Nov 2016 13:43:01 GMT
IGNITE-2523 "single put" NEAR update request


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a24a394b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a24a394b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a24a394b

Branch: refs/heads/ignite-4242
Commit: a24a394bb66ba0237a9e9ef940707d422b2980f0
Parents: 0234f67
Author: Konstantin Dudkov <kdudkov@ya.ru>
Authored: Mon Nov 21 13:53:58 2016 +0300
Committer: Konstantin Dudkov <kdudkov@ya.ru>
Committed: Mon Nov 21 13:53:58 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   24 +-
 .../processors/cache/GridCacheIoManager.java    |   64 +-
 .../processors/cache/GridCachePreloader.java    |   11 +
 .../cache/GridCachePreloaderAdapter.java        |    7 +
 .../dht/atomic/GridDhtAtomicCache.java          |   96 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   12 +-
 ...idNearAtomicAbstractSingleUpdateRequest.java |  562 +++++++++
 .../GridNearAtomicAbstractUpdateFuture.java     |   15 +-
 .../GridNearAtomicAbstractUpdateRequest.java    |  226 ++++
 .../atomic/GridNearAtomicFullUpdateRequest.java | 1031 +++++++++++++++++
 ...GridNearAtomicSingleUpdateFilterRequest.java |  226 ++++
 .../GridNearAtomicSingleUpdateFuture.java       |  137 ++-
 ...GridNearAtomicSingleUpdateInvokeRequest.java |  303 +++++
 .../GridNearAtomicSingleUpdateRequest.java      |  359 ++++++
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   42 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java | 1092 ------------------
 .../dht/preloader/GridDhtPreloader.java         |   20 +
 .../distributed/near/GridNearAtomicCache.java   |   10 +-
 .../resources/META-INF/classnames.properties    |   77 +-
 .../CacheAtomicSingleMessageCountSelfTest.java  |  259 +++++
 .../GridCacheAtomicMessageCountSelfTest.java    |   18 +-
 .../IgniteCacheAtomicStopBusySelfTest.java      |   10 +-
 .../IgniteCacheP2pUnmarshallingErrorTest.java   |    2 +-
 ...niteCacheClientNodeChangingTopologyTest.java |   22 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |    2 +-
 25 files changed, 3332 insertions(+), 1295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index fd55224..b20de68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -69,7 +69,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -392,7 +395,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 40:
-                msg = new GridNearAtomicUpdateRequest();
+                msg = new GridNearAtomicFullUpdateRequest();
 
                 break;
 
@@ -756,7 +759,22 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124] - this
+            case 125:
+                msg = new GridNearAtomicSingleUpdateRequest();
+
+                break;
+
+            case 126:
+                msg = new GridNearAtomicSingleUpdateInvokeRequest();
+
+                break;
+
+            case 127:
+                msg = new GridNearAtomicSingleUpdateFilterRequest();
+
+                break;
+
+            // [-3..119] [124..127] - this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index e450287..c5c1c60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -47,7 +47,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -462,8 +466,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @return Atomic future ID if applicable for message.
      */
     @Nullable private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) {
-        if (cacheMsg instanceof GridNearAtomicUpdateRequest)
-            return ((GridNearAtomicUpdateRequest)cacheMsg).futureVersion();
+        if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
+            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
         else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
             return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion();
         else if (cacheMsg instanceof GridDhtAtomicUpdateRequest)
@@ -480,8 +484,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @return Atomic future ID if applicable for message.
      */
     @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
-        if (cacheMsg instanceof GridNearAtomicUpdateRequest)
-            return ((GridNearAtomicUpdateRequest)cacheMsg).updateVersion();
+        if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
+            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
         else if (cacheMsg instanceof GridDhtAtomicUpdateRequest)
             return ((GridDhtAtomicUpdateRequest)cacheMsg).writeVersion();
 
@@ -562,7 +566,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             break;
 
             case 40: {
-                GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg;
+                GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg;
 
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
@@ -739,6 +743,54 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case 125: {
+                GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
+
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    nodeId,
+                    req.futureVersion(),
+                    ctx.deploymentEnabled());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 126: {
+                GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg;
+
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    nodeId,
+                    req.futureVersion(),
+                    ctx.deploymentEnabled());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 127: {
+                GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg;
+
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    nodeId,
+                    req.futureVersion(),
+                    ctx.deploymentEnabled());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]", msg.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index a49bb04..1d1cfab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -140,6 +141,16 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer);
 
     /**
+     * Requests that preloader sends the request for the key.
+     *
+     * @param req Message with keys to request.
+     * @param topVer Topology version, {@code -1} if not required.
+     * @return Future to complete when all keys are preloaded.
+     */
+    public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+        AffinityTopologyVersion topVer);
+
+    /**
      * Force preload process.
      */
     public void forcePreload();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 58b75df..b15ebc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -149,6 +150,12 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+        AffinityTopologyVersion topVer) {
+        return new GridFinishedFuture<>();
+    }
+
+    /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 30a3d57..f7d1973 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -136,7 +136,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /** Update reply closure. */
     @GridToStringExclude
-    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+    private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
 
     /** Pending */
     private GridDeferredAckMessageSender deferredUpdateMessageSender;
@@ -200,9 +200,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override protected void init() {
         super.init();
 
-        updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+        updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-            @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+            @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                     assert req.writeSynchronizationMode() != FULL_ASYNC : req;
 
@@ -323,11 +323,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.io().addHandler(
             ctx.cacheId(),
-            GridNearAtomicUpdateRequest.class,
-            new CI2<UUID, GridNearAtomicUpdateRequest>() {
+            GridNearAtomicAbstractUpdateRequest.class,
+            new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() {
                 @Override public void apply(
                     UUID nodeId,
-                    GridNearAtomicUpdateRequest req
+                    GridNearAtomicAbstractUpdateRequest req
                 ) {
                     processNearAtomicUpdateRequest(
                         nodeId,
@@ -335,8 +335,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 @Override public String toString() {
-                    return "GridNearAtomicUpdateRequest handler " +
-                        "[msgIdx=" + GridNearAtomicUpdateRequest.CACHE_MSG_IDX + ']';
+                    return "GridNearAtomicAbstractUpdateRequest handler " +
+                        "[msgIdx=" + GridNearAtomicAbstractUpdateRequest.CACHE_MSG_IDX + ']';
                 }
             });
 
@@ -1252,7 +1252,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         CacheEntryPredicate[] filters = CU.filterArray(filter);
 
-        if (conflictPutVal == null && conflictRmvVer == null && !isFastMap(filters, op)) {
+        if (conflictPutVal == null &&
+            conflictRmvVer == null &&
+            !isFastMap(filters, op)) {
             return new GridNearAtomicSingleUpdateFuture(
                 ctx,
                 this,
@@ -1603,10 +1605,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     public void updateAllAsyncInternal(
         final UUID nodeId,
-        final GridNearAtomicUpdateRequest req,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        final GridNearAtomicAbstractUpdateRequest req,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
-        IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
+        IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
 
         if (forceFut == null || forceFut.isDone()) {
             try {
@@ -1652,8 +1654,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param e Error.
      */
     private void onForceKeysError(final UUID nodeId,
-        final GridNearAtomicUpdateRequest req,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        final GridNearAtomicAbstractUpdateRequest req,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         IgniteCheckedException e
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
@@ -1673,17 +1675,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param req Update request.
      * @param completionCb Completion callback.
      */
-    public void updateAllAsyncInternal0(
+    private void updateAllAsyncInternal0(
         UUID nodeId,
-        GridNearAtomicUpdateRequest req,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        GridNearAtomicAbstractUpdateRequest req,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
             ctx.deploymentEnabled());
 
-        List<KeyCacheObject> keys = req.keys();
-
-        assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
+        assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
         GridDhtAtomicUpdateFuture dhtFut = null;
 
@@ -1696,7 +1696,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         try {
             // If batch store update is enabled, we need to lock all entries.
             // First, need to acquire locks on cache entries, then check filter.
-            List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion());
+            List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion());
 
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
@@ -1707,7 +1707,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 try {
                     if (top.stopping()) {
-                        res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " +
+                        res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " +
                             "(cache is stopped): " + name()));
 
                         completionCb.apply(req, res);
@@ -1757,7 +1757,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         GridCacheReturn retVal = null;
 
-                        if (keys.size() > 1 &&                    // Several keys ...
+                        if (req.size() > 1 &&                    // Several keys ...
                             writeThrough() && !req.skipStore() && // and store is enabled ...
                             !ctx.store().isLocal() &&             // and this is not local store ...
                                                                   // (conflict resolver should be used for local store)
@@ -1853,7 +1853,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
 
-            res.addFailedKeys(keys, e);
+            res.addFailedKeys(req.keys(), e);
 
             completionCb.apply(req, res);
 
@@ -1866,7 +1866,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (remap) {
             assert dhtFut == null;
 
-            res.remapKeys(keys);
+            res.remapKeys(req.keys());
 
             completionCb.apply(req, res);
         }
@@ -1904,12 +1904,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private UpdateBatchResult updateWithBatch(
         final ClusterNode node,
         final boolean hasNear,
-        final GridNearAtomicUpdateRequest req,
+        final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         final List<GridDhtCacheEntry> locked,
         final GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         final boolean replicate,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
@@ -1929,7 +1929,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
 
-        int size = req.keys().size();
+        int size = req.size();
 
         Map<KeyCacheObject, CacheObject> putMap = null;
 
@@ -2327,12 +2327,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private UpdateSingleResult updateSingle(
         ClusterNode node,
         boolean hasNear,
-        GridNearAtomicUpdateRequest req,
+        GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2341,8 +2341,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridCacheReturn retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
-        List<KeyCacheObject> keys = req.keys();
-
         AffinityTopologyVersion topVer = req.topologyVersion();
 
         boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
@@ -2352,8 +2350,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean intercept = ctx.config().getInterceptor() != null;
 
         // Avoid iterator creation.
-        for (int i = 0; i < keys.size(); i++) {
-            KeyCacheObject k = keys.get(i);
+        for (int i = 0; i < req.size(); i++) {
+            KeyCacheObject k = req.key(i);
 
             GridCacheOperation op = req.operation();
 
@@ -2489,7 +2487,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 if (updRes.removeVersion() != null) {
                     if (deleted == null)
-                        deleted = new ArrayList<>(keys.size());
+                        deleted = new ArrayList<>(req.size());
 
                     deleted.add(F.t(entry, updRes.removeVersion()));
                 }
@@ -2565,8 +2563,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable final Collection<KeyCacheObject> rmvKeys,
         @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
-        final GridNearAtomicUpdateRequest req,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         final boolean replicate,
         final UpdateBatchResult batchRes,
@@ -2803,17 +2801,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * Acquires java-level locks on cache entries. Returns collection of locked entries.
      *
-     * @param keys Keys to lock.
+     * @param req Request with keys to lock.
      * @param topVer Topology version to lock on.
      * @return Collection of locked entries.
      * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown,
      *      locks are released.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, AffinityTopologyVersion topVer)
+    private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer)
         throws GridDhtInvalidPartitionException {
-        if (keys.size() == 1) {
-            KeyCacheObject key = keys.get(0);
+        if (req.size() == 1) {
+            KeyCacheObject key = req.key(0);
 
             while (true) {
                 try {
@@ -2836,12 +2834,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         else {
-            List<GridDhtCacheEntry> locked = new ArrayList<>(keys.size());
+            List<GridDhtCacheEntry> locked = new ArrayList<>(req.size());
 
             while (true) {
-                for (KeyCacheObject key : keys) {
+                for (int i = 0; i < req.size(); i++) {
                     try {
-                        GridDhtCacheEntry entry = entryExx(key, topVer);
+                        GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
 
                         locked.add(entry);
                     }
@@ -2946,7 +2944,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *      will return false.
      * @return {@code True} if filter evaluation succeeded.
      */
-    private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
+    private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res) {
         try {
             return ctx.isAllLocked(entry, req.filter());
@@ -2961,7 +2959,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /**
      * @param req Request to remap.
      */
-    private void remapToNewPrimary(GridNearAtomicUpdateRequest req) {
+    private void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) {
         assert req.writeSynchronizationMode() == FULL_ASYNC : req;
 
         if (log.isDebugEnabled())
@@ -3040,9 +3038,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicUpdateRequest updateReq,
+        GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean force
     ) {
         if (!force) {
@@ -3073,7 +3071,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nodeId Sender node ID.
      * @param req Near atomic update request.
      */
-    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) {
+    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() +
                 ", writeVer=" + req.updateVersion() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4e59d11..c2ad8b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -83,7 +83,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** Completion callback. */
     @GridToStringExclude
-    private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+    private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
 
     /** Mappings. */
     @GridToStringInclude
@@ -93,7 +93,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
     /** Update request. */
-    private final GridNearAtomicUpdateRequest updateReq;
+    private final GridNearAtomicAbstractUpdateRequest updateReq;
 
     /** Update response. */
     private final GridNearAtomicUpdateResponse updateRes;
@@ -119,9 +119,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      */
     public GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         GridCacheVersion writeVer,
-        GridNearAtomicUpdateRequest updateReq,
+        GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes
     ) {
         this.cctx = cctx;
@@ -137,8 +137,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
         }
 
-        keys = new ArrayList<>(updateReq.keys().size());
-        mappings = U.newHashMap(updateReq.keys().size());
+        keys = new ArrayList<>(updateReq.size());
+        mappings = U.newHashMap(updateReq.size());
 
         waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
new file mode 100644
index 0000000..61deeee
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNearAtomicAbstractUpdateRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final CacheEntryPredicate[] NO_FILTER = new CacheEntryPredicate[0];
+
+    /** Fast map flag mask. */
+    private static final int FAST_MAP_FLAG_MASK = 0x1;
+
+    /** Flag indicating whether request contains primary keys. */
+    private static final int HAS_PRIMARY_FLAG_MASK = 0x2;
+
+    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+    private static final int TOP_LOCKED_FLAG_MASK = 0x4;
+
+    /** Skip write-through to a persistent storage. */
+    private static final int SKIP_STORE_FLAG_MASK = 0x8;
+
+    /** */
+    private static final int CLIENT_REQ_FLAG_MASK = 0x10;
+
+    /** Keep binary flag. */
+    private static final int KEEP_BINARY_FLAG_MASK = 0x20;
+
+    /** Return value flag. */
+    private static final int RET_VAL_FLAG_MASK = 0x40;
+
+    /** Target node ID. */
+    @GridDirectTransient
+    protected UUID nodeId;
+
+    /** Future version. */
+    protected GridCacheVersion futVer;
+
+    /** Update version. Set to non-null if fastMap is {@code true}. */
+    private GridCacheVersion updateVer;
+
+    /** Topology version. */
+    protected AffinityTopologyVersion topVer;
+
+    /** Write synchronization mode. */
+    protected CacheWriteSynchronizationMode syncMode;
+
+    /** Update operation. */
+    protected GridCacheOperation op;
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
+    /** */
+    @GridDirectTransient
+    private GridNearAtomicUpdateResponse res;
+
+    /** Compressed boolean flags. */
+    protected byte flags;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    protected GridNearAtomicAbstractSingleUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futVer Future version.
+     * @param fastMap Fast map scheme flag.
+     * @param updateVer Update version set if fast map is performed.
+     * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
+     * @param syncMode Synchronization mode.
+     * @param op Cache update operation.
+     * @param retval Return value required flag.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
+     * @param clientReq Client node request flag.
+     * @param addDepInfo Deployment info flag.
+     */
+    protected GridNearAtomicAbstractSingleUpdateRequest(
+        int cacheId,
+        UUID nodeId,
+        GridCacheVersion futVer,
+        boolean fastMap,
+        @Nullable GridCacheVersion updateVer,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean clientReq,
+        boolean addDepInfo
+    ) {
+        assert futVer != null;
+
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+        this.futVer = futVer;
+        this.updateVer = updateVer;
+        this.topVer = topVer;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.addDepInfo = addDepInfo;
+
+        fastMap(fastMap);
+        topologyLocked(topLocked);
+        returnValue(retval);
+        skipStore(skipStore);
+        keepBinary(keepBinary);
+        clientRequest(clientReq);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /**
+     * @return Mapped node ID.
+     */
+    @Override public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     */
+    @Override public void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return Subject ID.
+     */
+    @Override public UUID subjectId() {
+        return subjId;
+    }
+
+    /**
+     * @return Task name hash.
+     */
+    @Override public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /**
+     * @return Future version.
+     */
+    @Override public GridCacheVersion futureVersion() {
+        return futVer;
+    }
+
+    /**
+     * @return Update version for fast-map request.
+     */
+    @Override public GridCacheVersion updateVersion() {
+        return updateVer;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Cache write synchronization mode.
+     */
+    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
+
+    /**
+     * @return Expiry policy.
+     */
+    @Override public ExpiryPolicy expiry() {
+        return null;
+    }
+
+    /**
+     * @return Update operation.
+     */
+    @Override public GridCacheOperation operation() {
+        return op;
+    }
+
+    /**
+     * @return Optional arguments for entry processor.
+     */
+    @Override @Nullable public Object[] invokeArguments() {
+        return null;
+    }
+
+    /**
+     * @param res Response.
+     * @return {@code True} if current response was {@code null}.
+     */
+    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+        if (this.res == null) {
+            this.res = res;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @return Response.
+     */
+    @Override @Nullable public GridNearAtomicUpdateResponse response() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+        return ctx.atomicMessageLogger();
+    }
+
+    /**
+     * @return Flag indicating whether this is fast-map udpate.
+     */
+    @Override public boolean fastMap() {
+        return isFlag(FAST_MAP_FLAG_MASK);
+    }
+
+    /**
+     * Sets fastMap flag value.
+     */
+    public void fastMap(boolean val) {
+        setFlag(val, FAST_MAP_FLAG_MASK);
+    }
+
+    /**
+     * @return Topology locked flag.
+     */
+    @Override public boolean topologyLocked() {
+        return isFlag(TOP_LOCKED_FLAG_MASK);
+    }
+
+    /**
+     * Sets topologyLocked flag value.
+     */
+    public void topologyLocked(boolean val) {
+        setFlag(val, TOP_LOCKED_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if request sent from client node.
+     */
+    @Override public boolean clientRequest() {
+        return isFlag(CLIENT_REQ_FLAG_MASK);
+    }
+
+    /**
+     * Sets clientRequest flag value.
+     */
+    public void clientRequest(boolean val) {
+        setFlag(val, CLIENT_REQ_FLAG_MASK);
+    }
+
+    /**
+     * @return Return value flag.
+     */
+    @Override public boolean returnValue() {
+        return isFlag(RET_VAL_FLAG_MASK);
+    }
+
+    /**
+     * Sets returnValue flag value.
+     */
+    public void returnValue(boolean val) {
+        setFlag(val, RET_VAL_FLAG_MASK);
+    }
+
+    /**
+     * @return Skip write-through to a persistent storage.
+     */
+    @Override public boolean skipStore() {
+        return isFlag(SKIP_STORE_FLAG_MASK);
+    }
+
+    /**
+     * Sets skipStore flag value.
+     */
+    public void skipStore(boolean val) {
+        setFlag(val, SKIP_STORE_FLAG_MASK);
+    }
+
+    /**
+     * @return Keep binary flag.
+     */
+    @Override public boolean keepBinary() {
+        return isFlag(KEEP_BINARY_FLAG_MASK);
+    }
+
+    /**
+     * Sets keepBinary flag value.
+     */
+    public void keepBinary(boolean val) {
+        setFlag(val, KEEP_BINARY_FLAG_MASK);
+    }
+
+    /**
+     * @return Flag indicating whether this request contains primary keys.
+     */
+    @Override public boolean hasPrimary() {
+        return isFlag(HAS_PRIMARY_FLAG_MASK);
+    }
+
+    /**
+     * Sets hasPrimary flag value.
+     */
+    public void hasPrimary(boolean val) {
+        setFlag(val, HAS_PRIMARY_FLAG_MASK);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public CacheEntryPredicate[] filter() {
+        return NO_FILTER;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reads flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("futVer", futVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                futVer = reader.readMessage("futVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                byte opOrd;
+
+                opOrd = reader.readByte("op");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                op = GridCacheOperation.fromOrdinal(opOrd);
+
+                reader.incrementState();
+
+            case 6:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 8:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                updateVer = reader.readMessage("updateVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicAbstractSingleUpdateRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 11;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 85751bb..2fbabaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -36,10 +39,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 
@@ -255,11 +254,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+    protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+                new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
                         onResult(res.nodeId(), res, false);
                     }
                 });
@@ -303,7 +302,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param req Request.
      * @param e Error.
      */
-    protected void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+    protected void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
         synchronized (mux) {
             GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                 req.nodeId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
new file mode 100644
index 0000000..bee2ecd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.List;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+    /** Message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /**
+     * @return Mapped node ID.
+     */
+    public abstract UUID nodeId();
+
+    /**
+     * @param nodeId Node ID.
+     */
+    public abstract void nodeId(UUID nodeId);
+
+    /**
+     * @return Subject ID.
+     */
+    public abstract UUID subjectId();
+
+    /**
+     * @return Task name hash.
+     */
+    public abstract int taskNameHash();
+
+    /**
+     * @return Future version.
+     */
+    public abstract GridCacheVersion futureVersion();
+
+    /**
+     * @return Flag indicating whether this is fast-map udpate.
+     */
+    public abstract boolean fastMap();
+
+    /**
+     * @return Update version for fast-map request.
+     */
+    public abstract GridCacheVersion updateVersion();
+
+    /**
+     * @return Topology locked flag.
+     */
+    public abstract boolean topologyLocked();
+
+    /**
+     * @return {@code True} if request sent from client node.
+     */
+    public abstract boolean clientRequest();
+
+    /**
+     * @return Cache write synchronization mode.
+     */
+    public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+
+    /**
+     * @return Expiry policy.
+     */
+    public abstract ExpiryPolicy expiry();
+
+    /**
+     * @return Return value flag.
+     */
+    public abstract boolean returnValue();
+
+    /**
+     * @return Filter.
+     */
+    @Nullable public abstract CacheEntryPredicate[] filter();
+
+    /**
+     * @return Skip write-through to a persistent storage.
+     */
+    public abstract boolean skipStore();
+
+    /**
+     * @return Keep binary flag.
+     */
+    public abstract boolean keepBinary();
+
+    /**
+     * @return Update operation.
+     */
+    public abstract GridCacheOperation operation();
+
+    /**
+     * @return Optional arguments for entry processor.
+     */
+    @Nullable public abstract Object[] invokeArguments();
+
+    /**
+     * @return Flag indicating whether this request contains primary keys.
+     */
+    public abstract boolean hasPrimary();
+
+    /**
+     * @param res Response.
+     * @return {@code True} if current response was {@code null}.
+     */
+    public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+
+    /**
+     * @return Response.
+     */
+    @Nullable public abstract GridNearAtomicUpdateResponse response();
+
+    /**
+     * @param key Key to add.
+     * @param val Optional update value.
+     * @param conflictTtl Conflict TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param primary If given key is primary on this mapping.
+     */
+    public abstract void addUpdateEntry(KeyCacheObject key,
+        @Nullable Object val,
+        long conflictTtl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary);
+
+    /**
+     * @return Keys for this update request.
+     */
+    public abstract List<KeyCacheObject> keys();
+
+    /**
+     * @return Values for this update request.
+     */
+    public abstract List<?> values();
+
+    /**
+     * @param idx Key index.
+     * @return Value.
+     */
+    public abstract CacheObject value(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Entry processor.
+     */
+    public abstract EntryProcessor<Object, Object, Object> entryProcessor(int idx);
+
+    /**
+     * @param idx Index to get.
+     * @return Write value - either value, or transform closure.
+     */
+    public abstract CacheObject writeValue(int idx);
+
+
+    /**
+     * @return Conflict versions.
+     */
+    @Nullable public abstract List<GridCacheVersion> conflictVersions();
+
+    /**
+     * @param idx Index.
+     * @return Conflict version.
+     */
+    @Nullable public abstract GridCacheVersion conflictVersion(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Conflict TTL.
+     */
+    public abstract long conflictTtl(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Conflict expire time.
+     */
+    public abstract long conflictExpireTime(int idx);
+
+    /**
+     * Cleanup values.
+     *
+     * @param clearKeys If {@code true} clears keys.
+     */
+    public abstract void cleanup(boolean clearKeys);
+
+    /**
+     * @return Keys size.
+     */
+    public abstract int size();
+
+    /**
+     * @param idx Key index.
+     * @return Key.
+     */
+    public abstract KeyCacheObject key(int idx);
+}


Mime
View raw message