ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: Single update POC for atomic cache.
Date Tue, 10 Nov 2015 12:45:20 GMT
Single update POC for atomic cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d310860
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d310860
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d310860

Branch: refs/heads/ignite-1843
Commit: 1d31086038a0d8359f4bb1304a8ad8b6d9370a69
Parents: 77a3f64
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Nov 10 15:45:02 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Nov 10 15:45:02 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |   11 +
 .../processors/cache/GridCacheMessage.java      |   18 +
 .../dht/atomic/GridDhtAtomicCache.java          |  460 +++++++-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  401 +++++++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   13 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  177 ++-
 .../GridNearAtomicSingleUpdateFuture.java       | 1093 ++++++++++++++++++
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   20 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  187 ++-
 .../distributed/near/GridNearAtomicCache.java   |   78 +-
 10 files changed, 2326 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index be35c5c..15f004a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -18,8 +18,11 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
+import java.util.UUID;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 
 /**
  * Update future for atomic cache.
@@ -37,4 +40,12 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
      * @return Future keys.
      */
     public Collection<?> keys();
+
+    public void map();
+
+    public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse res);
+
+    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res);
+
+    public void onResult(UUID nodeId);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index bdd2118..45a4b87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -511,6 +511,15 @@ public abstract class GridCacheMessage implements Message {
         }
     }
 
+    protected final void prepareMarshalCacheObject(@Nullable CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
+        if (obj != null) {
+            obj.prepareMarshal(ctx.cacheObjectContext());
+
+            if (addDepInfo)
+                prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
+        }
+    }
+
     /**
      * @param col Collection.
      * @param ctx Cache context.
@@ -556,6 +565,15 @@ public abstract class GridCacheMessage implements Message {
         }
     }
 
+    protected final void finishUnmarshalCacheObject(@Nullable CacheObject obj,
+        GridCacheContext ctx,
+        ClassLoader ldr)
+        throws IgniteCheckedException
+    {
+        if (obj != null)
+            obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
+    }
+
     /**
      * @param col Collection.
      * @param ctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7f9edb2..da5cb6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -374,10 +375,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate... filter) {
         A.notNull(key, "key");
+        A.notNull(val, "val");
 
-        return updateAllAsync0(F0.asMap(key, val),
-            null,
-            null,
+        return snigleUpdateAllAsync0(key,
+            val,
             null,
             null,
             false,
@@ -776,6 +777,57 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             true);
     }
 
+    private IgniteInternalFuture snigleUpdateAllAsync0(
+        K key,
+        V val,
+        @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
+        @Nullable Object[] invokeArgs,
+        final boolean retval,
+        final boolean rawRetval,
+        @Nullable final CacheEntryPredicate[] filter,
+        final boolean waitTopFut
+    ) {
+        assert ctx.updatesAllowed();
+
+        if (map != null && keyCheck)
+            validateCacheKeys(map.keySet());
+
+        ctx.checkSecurity(SecurityPermission.CACHE_PUT);
+
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        UUID subjId = ctx.subjectIdPerCall(null, opCtx);
+
+        int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+
+        final GridNearAtomicSingleUpdateFuture updateFut = new GridNearAtomicSingleUpdateFuture(
+            ctx,
+            this,
+            ctx.config().getWriteSynchronizationMode(),
+            invokeMap != null ? TRANSFORM : UPDATE,
+            key,
+            val,
+            invokeArgs,
+            retval,
+            rawRetval,
+            opCtx != null ? opCtx.expiry() : null,
+            filter,
+            subjId,
+            taskNameHash,
+            opCtx != null && opCtx.skipStore(),
+            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+            waitTopFut);
+
+        return asyncOp(new CO<IgniteInternalFuture<Object>>() {
+            @Override public IgniteInternalFuture<Object> apply() {
+                updateFut.map();
+
+                return updateFut;
+            }
+        });
+    }
+
+
     /**
      * Entry point for all public API put/transform methods.
      *
@@ -1054,7 +1106,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final GridNearAtomicUpdateRequest req,
         final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
-        IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
+        IgniteInternalFuture<Object> forceFut = preldr.request(req.singleUpdate() ? Collections.singleton(req.singleKey()) : req.keys(), req.topologyVersion());
 
         if (forceFut.isDone())
             updateAllAsyncInternal0(nodeId, req, completionCb);
@@ -1082,11 +1134,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
             ctx.deploymentEnabled());
 
-        List<KeyCacheObject> keys = req.keys();
+        boolean single = req.singleUpdate();
 
-        assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
+        assert !req.returnValue() || (req.operation() == TRANSFORM || single || req.keys().size() == 1);
 
-        GridDhtAtomicUpdateFuture dhtFut = null;
+        GridCacheAtomicFuture dhtFut = null;
 
         boolean remap = false;
 
@@ -1097,7 +1149,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         try {
             // If batch store update is enabled, we need to lock all entries.
             // First, need to acquire locks on cache entries, then check filter.
-            List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion());
+            List<GridDhtCacheEntry> locked = null;
+            GridDhtCacheEntry singleLocked = null;
+
+            if (req.singleUpdate())
+                singleLocked = lockEntry(req.singleKey(), req.topologyVersion());
+            else
+                locked = lockEntries(req.keys(), req.topologyVersion());
 
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
@@ -1106,8 +1164,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 try {
                     if (topology().stopping()) {
-                        res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " +
-                            "(cache is stopped): " + name()));
+                        res.addFailedKeys(single ? Collections.singleton(req.singleKey()) : req.keys(),
+                            new IgniteCheckedException("Failed to perform cache operation " +
+                                "(cache is stopped): " + name()));
 
                         completionCb.apply(req, res);
 
@@ -1152,7 +1211,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         GridCacheReturn retVal = null;
 
-                        if (keys.size() > 1 &&                             // Several keys ...
+                        if (single) {
+                            UpdateSingleResult updRes = updateSingleEntry(node,
+                                hasNear,
+                                req,
+                                res,
+                                singleLocked,
+                                ver,
+                                (GridDhtAtomicSingleUpdateFuture)dhtFut,
+                                completionCb,
+                                ctx.isDrEnabled(),
+                                taskName,
+                                expiry);
+
+                            retVal = updRes.returnValue();
+                            deleted = updRes.deleted();
+                            dhtFut = updRes.dhtFuture();
+                        }
+                        else if (req.keys().size() > 1 &&                             // Several keys ...
                             writeThrough() && !req.skipStore() &&          // and store is enabled ...
                             !ctx.store().isLocal() &&                      // and this is not local store ...
                             !ctx.dr().receiveEnabled()                     // and no DR.
@@ -1164,7 +1240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 res,
                                 locked,
                                 ver,
-                                dhtFut,
+                                (GridDhtAtomicUpdateFuture)dhtFut,
                                 completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
@@ -1183,7 +1259,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 res,
                                 locked,
                                 ver,
-                                dhtFut,
+                                (GridDhtAtomicUpdateFuture)dhtFut,
                                 completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
@@ -1216,9 +1292,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 e.printStackTrace();
             }
             finally {
-                if (locked != null)
-                    unlockEntries(locked, req.topologyVersion());
+                if (single) {
+                    if (singleLocked != null)
+                        unlockSingleEntry(singleLocked, req.topologyVersion());
+                }
+                else {
+                    if (locked != null)
+                        unlockEntries(locked, req.topologyVersion());
+                }
 
+                // TODO
                 // Enqueue if necessary after locks release.
                 if (deleted != null) {
                     assert !deleted.isEmpty();
@@ -1242,7 +1325,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
 
-            res.addFailedKeys(keys, e);
+            res.addFailedKeys(single ? Collections.singleton(req.singleKey()) : req.keys(), e);
 
             completionCb.apply(req, res);
 
@@ -1252,7 +1335,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (remap) {
             assert dhtFut == null;
 
-            res.remapKeys(req.keys());
+            res.remapKeys(single ? Collections.singletonList(req.singleKey()) : req.keys());
 
             completionCb.apply(req, res);
         }
@@ -1777,7 +1860,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     taskName);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                    dhtFut = createDhtFuture(ver, req, res, completionCb, true);
+                    dhtFut = (GridDhtAtomicUpdateFuture)createDhtFuture(ver, req, res, completionCb, true);
 
                     readersOnly = true;
                 }
@@ -1889,6 +1972,187 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * Updates locked entries one-by-one.
+     *
+     * @param node Originating node.
+     * @param hasNear {@code True} if originating node has near cache.
+     * @param req Update request.
+     * @param res Update response.
+     * @param entry Locked entry.
+     * @param ver Assigned update version.
+     * @param dhtFut Optional DHT future.
+     * @param completionCb Completion callback to invoke when DHT future is completed.
+     * @param replicate Whether DR is enabled for that cache.
+     * @param taskName Task name.
+     * @param expiry Expiry policy.
+     * @return Return value.
+     * @throws GridCacheEntryRemovedException Should be never thrown.
+     */
+    private UpdateSingleResult updateSingleEntry(
+        ClusterNode node,
+        boolean hasNear,
+        GridNearAtomicUpdateRequest req,
+        GridNearAtomicUpdateResponse res,
+        GridDhtCacheEntry entry,
+        GridCacheVersion ver,
+        @Nullable GridDhtAtomicSingleUpdateFuture dhtFut,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        boolean replicate,
+        String taskName,
+        @Nullable IgniteCacheExpiryPolicy expiry
+    ) throws GridCacheEntryRemovedException {
+        GridCacheReturn retVal = null;
+        Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
+
+        KeyCacheObject k = req.singleKey();
+
+        AffinityTopologyVersion topVer = req.topologyVersion();
+
+        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+
+        boolean readersOnly = false;
+
+        boolean intercept = ctx.config().getInterceptor() != null;
+
+        GridCacheOperation op = req.operation();
+
+        // We are holding java-level locks on entries at this point.
+        // No GridCacheEntryRemovedException can be thrown.
+        try {
+            if (entry == null)
+                return new UpdateSingleResult(retVal, deleted, dhtFut);;
+
+            boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
+                req.topologyVersion());
+
+            Object writeVal = op == TRANSFORM ? req.singleEntryProcessor() : req.singleWriteValue();
+
+            Collection<UUID> readers = null;
+            Collection<UUID> filteredReaders = null;
+
+            if (checkReaders) {
+                readers = entry.readers();
+                filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+            }
+
+            GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+                ver,
+                node.id(),
+                locNodeId,
+                op,
+                writeVal,
+                req.invokeArguments(),
+                primary && writeThrough() && !req.skipStore(),
+                !req.skipStore(),
+                req.returnValue(),
+                expiry,
+                true,
+                true,
+                primary,
+                ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+                topVer,
+                req.filter(),
+                replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+                CU.TTL_NOT_CHANGED,
+                CU.EXPIRE_TIME_CALCULATE,
+                null,
+                true,
+                intercept,
+                req.subjectId(),
+                taskName);
+
+            if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+                dhtFut = (GridDhtAtomicSingleUpdateFuture)createDhtFuture(ver, req, res, completionCb, true);
+
+                readersOnly = true;
+            }
+
+            if (dhtFut != null) {
+                if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
+                    assert updRes.conflictResolveResult() == null : updRes;
+
+                    if (!readersOnly) {
+                        dhtFut.addWriteEntry(entry,
+                            updRes.newValue(),
+                            updRes.newTtl());
+                    }
+
+                    if (!F.isEmpty(filteredReaders))
+                        dhtFut.addNearWriteEntries(filteredReaders,
+                            entry,
+                            updRes.newValue(),
+                            updRes.newTtl());
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Entry did not pass the filter or conflict resolution (will skip write) " +
+                            "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
+                }
+            }
+
+            if (hasNear) {
+                if (primary && updRes.sendToDht()) {
+                    if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
+                        // If put the same value as in request then do not need to send it back.
+                        if (op == TRANSFORM || writeVal != updRes.newValue()) {
+                            res.addNearValue(0,
+                                updRes.newValue(),
+                                updRes.newTtl(),
+                                updRes.conflictExpireTime());
+                        }
+                        else
+                            res.addNearTtl(0, updRes.newTtl(), updRes.conflictExpireTime());
+
+                        if (updRes.newValue() != null) {
+                            IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+
+                            assert f == null : f;
+                        }
+                    }
+                    else if (F.contains(readers, node.id())) // Reader became primary or backup.
+                        entry.removeReader(node.id(), req.messageId());
+                    else
+                        res.addSkippedIndex(0);
+                }
+                else
+                    res.addSkippedIndex(0);
+            }
+
+            if (updRes.removeVersion() != null)
+                deleted = Collections.singleton(F.t(entry, updRes.removeVersion()));
+
+            if (op == TRANSFORM) {
+                assert !req.returnValue();
+
+                IgniteBiTuple<Object, Exception> compRes = updRes.computedResult();
+
+                if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
+                    retVal = new GridCacheReturn(node.isLocal());
+
+                    retVal.addEntryProcessResult(ctx,
+                        k,
+                        null,
+                        compRes.get1(),
+                        compRes.get2());
+                }
+            }
+            else {
+                CacheObject ret = updRes.oldValue();
+
+                retVal = new GridCacheReturn(ctx,
+                    node.isLocal(),
+                    req.returnValue() ? ret : null,
+                    updRes.success());
+            }
+        }
+        catch (IgniteCheckedException e) {
+            res.addFailedKey(k, e);
+        }
+
+        return new UpdateSingleResult(retVal, deleted, dhtFut);
+    }
+
+    /**
      * @param hasNear {@code True} if originating node has near cache.
      * @param firstEntryIdx Index of the first entry in the request keys collection.
      * @param entries Entries to update.
@@ -2067,7 +2331,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     batchRes.addDeleted(entry, updRes, entries);
 
                     if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                        dhtFut = createDhtFuture(ver, req, res, completionCb, true);
+                        dhtFut = (GridDhtAtomicUpdateFuture)createDhtFuture(ver, req, res, completionCb, true);
 
                         batchRes.readersOnly(true);
                     }
@@ -2139,6 +2403,29 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         return dhtFut;
     }
 
+    private GridDhtCacheEntry lockEntry(KeyCacheObject key, AffinityTopologyVersion topVer)
+        throws GridDhtInvalidPartitionException {
+        while (true) {
+            try {
+                GridDhtCacheEntry entry = entryExx(key, topVer);
+
+                UNSAFE.monitorEnter(entry);
+
+                if (entry.obsolete())
+                    UNSAFE.monitorExit(entry);
+                else
+                    return entry;
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                // Ignore invalid partition exception in CLOCK ordering mode.
+                if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                    return null;
+                else
+                    throw e;
+            }
+        }
+    }
+
     /**
      * Acquires java-level locks on cache entries. Returns collection of locked entries.
      *
@@ -2226,6 +2513,30 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
     }
 
+    private void unlockSingleEntry(GridDhtCacheEntry entry, AffinityTopologyVersion topVer) {
+        // Process deleted entries before locks release.
+        assert ctx.deferredDelete(this) : this;
+
+        boolean skip = false;
+
+        try {
+            if (entry.deleted())
+                skip = true;
+        }
+        finally {
+            UNSAFE.monitorExit(entry);
+        }
+
+        entry.onUnlock();
+
+        if (skip)
+            return;
+
+        // Must touch all entries since update may have deleted entries.
+        // Eviction manager will remove empty entries.
+        ctx.evicts().touch(entry, topVer);
+    }
+
     /**
      * Releases java-level locks on cache entries.
      *
@@ -2376,7 +2687,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param force If {@code true} then creates future without optimizations checks.
      * @return Backup update future or {@code null} if there are no backups.
      */
-    @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
+    @Nullable private GridCacheAtomicFuture createDhtFuture(
         GridCacheVersion writeVer,
         GridNearAtomicUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes,
@@ -2404,7 +2715,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
 
-        return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+        return updateReq.singleUpdate() ?
+            new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes) :
+            new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
     }
 
     /**
@@ -2452,7 +2765,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         res.nodeId(ctx.localNodeId());
 
-        GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+        GridCacheAtomicFuture fut = (GridCacheAtomicFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
         if (fut != null)
             fut.onResult(nodeId, res);
@@ -2481,8 +2794,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        for (int i = 0; i < req.size(); i++) {
-            KeyCacheObject key = req.key(i);
+        if (req.singleUpdate()) {
+            KeyCacheObject key = req.singleKey();
 
             try {
                 while (true) {
@@ -2491,22 +2804,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     try {
                         entry = entryExx(key);
 
-                        CacheObject val = req.value(i);
-                        EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+                        CacheObject val = req.singleValue();
 
-                        GridCacheOperation op = entryProcessor != null ? TRANSFORM :
-                            (val != null) ? UPDATE : DELETE;
+                        GridCacheOperation op = (val != null) ? UPDATE : DELETE;
 
-                        long ttl = req.ttl(i);
-                        long expireTime = req.conflictExpireTime(i);
+                        long ttl = req.ttl(0);
+                        long expireTime = req.conflictExpireTime(0);
 
                         GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                             ver,
                             nodeId,
                             nodeId,
                             op,
-                            op == TRANSFORM ? entryProcessor : val,
-                            op == TRANSFORM ? req.invokeArguments() : null,
+                            val,
+                            null,
                             /*write-through*/false,
                             /*read-through*/false,
                             /*retval*/false,
@@ -2520,7 +2831,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             replicate ? DR_BACKUP : DR_NONE,
                             ttl,
                             expireTime,
-                            req.conflictVersion(i),
+                            null,
                             false,
                             intercept,
                             req.subjectId(),
@@ -2552,6 +2863,79 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
             }
         }
+        else {
+            for (int i = 0; i < req.size(); i++) {
+                KeyCacheObject key = req.key(i);
+
+                try {
+                    while (true) {
+                        GridDhtCacheEntry entry = null;
+
+                        try {
+                            entry = entryExx(key);
+
+                            CacheObject val = req.value(i);
+                            EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+
+                            GridCacheOperation op = entryProcessor != null ? TRANSFORM :
+                                (val != null) ? UPDATE : DELETE;
+
+                            long ttl = req.ttl(i);
+                            long expireTime = req.conflictExpireTime(i);
+
+                            GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+                                ver,
+                                nodeId,
+                                nodeId,
+                                op,
+                                op == TRANSFORM ? entryProcessor : val,
+                                op == TRANSFORM ? req.invokeArguments() : null,
+                            /*write-through*/false,
+                            /*read-through*/false,
+                            /*retval*/false,
+                            /*expiry policy*/null,
+                            /*event*/true,
+                            /*metrics*/true,
+                            /*primary*/false,
+                            /*check version*/!req.forceTransformBackups(),
+                                req.topologyVersion(),
+                                CU.empty0(),
+                                replicate ? DR_BACKUP : DR_NONE,
+                                ttl,
+                                expireTime,
+                                req.conflictVersion(i),
+                                false,
+                                intercept,
+                                req.subjectId(),
+                                taskName);
+
+                            if (updRes.removeVersion() != null)
+                                ctx.onDeferredDelete(entry, updRes.removeVersion());
+
+                            entry.onUnlock();
+
+                            break; // While.
+                        }
+                        catch (GridCacheEntryRemovedException ignored) {
+                            if (log.isDebugEnabled())
+                                log.debug("Got removed entry while updating backup value (will retry): " + key);
+
+                            entry = null;
+                        }
+                        finally {
+                            if (entry != null)
+                                ctx.evicts().touch(entry, req.topologyVersion());
+                        }
+                    }
+                }
+                catch (GridDhtInvalidPartitionException ignored) {
+                    // Ignore.
+                }
+                catch (IgniteCheckedException e) {
+                    res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+                }
+            }
+        }
 
         if (isNearEnabled(cacheCfg))
             ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
@@ -2612,7 +2996,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (log.isDebugEnabled())
             log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']');
 
-        GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().
+        GridCacheAtomicFuture updateFut = (GridCacheAtomicFuture)ctx.mvcc().
             atomicFuture(res.futureVersion());
 
         if (updateFut != null)
@@ -2632,7 +3016,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']');
 
         for (GridCacheVersion ver : res.futureVersions()) {
-            GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver);
+            GridCacheAtomicFuture updateFut = (GridCacheAtomicFuture)ctx.mvcc().atomicFuture(ver);
 
             if (updateFut != null)
                 updateFut.onResult(nodeId);
@@ -2676,7 +3060,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted;
 
         /** */
-        private final GridDhtAtomicUpdateFuture dhtFut;
+        private final GridCacheAtomicFuture dhtFut;
 
         /**
          * @param retVal Return value.
@@ -2685,7 +3069,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
          */
         private UpdateSingleResult(GridCacheReturn retVal,
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted,
-            GridDhtAtomicUpdateFuture dhtFut) {
+            GridCacheAtomicFuture dhtFut) {
             this.retVal = retVal;
             this.deleted = deleted;
             this.dhtFut = dhtFut;
@@ -2708,7 +3092,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /**
          * @return DHT future.
          */
-        public GridDhtAtomicUpdateFuture dhtFuture() {
+        public GridCacheAtomicFuture dhtFuture() {
             return dhtFut;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
new file mode 100644
index 0000000..f31dda2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * DHT atomic cache backup update future.
+ */
+public class GridDhtAtomicSingleUpdateFuture extends GridFutureAdapter<Void>
+    implements GridCacheAtomicFuture<Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Cache context. */
+    private GridCacheContext cctx;
+
+    /** Future version. */
+    private GridCacheVersion futVer;
+
+    /** Write version. */
+    private GridCacheVersion writeVer;
+
+    /** Completion callback. */
+    @GridToStringExclude
+    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+
+    /** Mappings. */
+    @GridToStringInclude
+    private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+
+    /** Entries with readers. */
+    private GridDhtCacheEntry nearReaderEntry;
+
+    /** Update request. */
+    private GridNearAtomicUpdateRequest updateReq;
+
+    /** Update response. */
+    private GridNearAtomicUpdateResponse updateRes;
+
+    /** */
+    private boolean waitForExchange;
+
+    /**
+     * @param cctx Cache context.
+     * @param completionCb Callback to invoke when future is completed.
+     * @param writeVer Write version.
+     * @param updateReq Update request.
+     * @param updateRes Update response.
+     */
+    public GridDhtAtomicSingleUpdateFuture(
+        GridCacheContext cctx,
+        CI2<GridNearAtomicUpdateRequest,
+            GridNearAtomicUpdateResponse> completionCb,
+        GridCacheVersion writeVer,
+        GridNearAtomicUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes
+    ) {
+        assert updateReq.singleUpdate() : updateReq;
+
+        this.cctx = cctx;
+        this.writeVer = writeVer;
+
+        futVer = cctx.versions().next(updateReq.topologyVersion());
+        this.updateReq = updateReq;
+        this.completionCb = completionCb;
+        this.updateRes = updateRes;
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicSingleUpdateFuture.class);
+
+        boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
+
+        waitForExchange = !topLocked;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futVer.asGridUuid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return futVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        if (log.isDebugEnabled())
+            log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
+
+        GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
+
+        if (req != null) {
+            // Remove only after added keys to failed set.
+            mappings.remove(nodeId);
+
+            checkComplete();
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
+            return this;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<KeyCacheObject> keys() {
+        return Collections.singleton(updateReq.singleKey());
+    }
+
+    /**
+     * @param entry Entry to map.
+     * @param val Value to write.
+     * @param ttl TTL (optional).
+     */
+    public void addWriteEntry(GridDhtCacheEntry entry,
+        @Nullable CacheObject val,
+        long ttl) {
+        AffinityTopologyVersion topVer = updateReq.topologyVersion();
+
+        Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+
+        if (log.isDebugEnabled())
+            log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
+
+        CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
+
+        for (ClusterNode node : dhtNodes) {
+            UUID nodeId = node.id();
+
+            if (!nodeId.equals(cctx.localNodeId())) {
+                GridDhtAtomicUpdateRequest updateReq = new GridDhtAtomicUpdateRequest(
+                    cctx.cacheId(),
+                    nodeId,
+                    futVer,
+                    writeVer,
+                    syncMode,
+                    topVer,
+                    false,
+                    this.updateReq.subjectId(),
+                    this.updateReq.taskNameHash(),
+                    null,
+                    cctx.deploymentEnabled(),
+                    true);
+
+                mappings.put(nodeId, updateReq);
+
+                updateReq.addSingleWriteValue(entry.key(),
+                    val,
+                    ttl);
+            }
+        }
+    }
+
+    /**
+     * @param readers Entry readers.
+     * @param entry Entry.
+     * @param val Value.
+     * @param ttl TTL for near cache update (optional).
+     */
+    public void addNearWriteEntries(Iterable<UUID> readers,
+        GridDhtCacheEntry entry,
+        @Nullable CacheObject val,
+        long ttl) {
+        CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
+
+        AffinityTopologyVersion topVer = updateReq.topologyVersion();
+
+        for (UUID nodeId : readers) {
+            GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+
+
+            ClusterNode node = cctx.discovery().node(nodeId);
+
+            // Node left the grid.
+            if (node == null)
+                continue;
+
+            if (updateReq == null) {
+                updateReq = new GridDhtAtomicUpdateRequest(
+                    cctx.cacheId(),
+                    nodeId,
+                    futVer,
+                    writeVer,
+                    syncMode,
+                    topVer,
+                    false,
+                    this.updateReq.subjectId(),
+                    this.updateReq.taskNameHash(),
+                    null,
+                    cctx.deploymentEnabled(),
+                    false);
+
+                mappings.put(nodeId, updateReq);
+            }
+
+            nearReaderEntry = entry;
+
+            updateReq.addNearWriteValue(entry.key(),
+                val,
+                null,
+                ttl,
+                -1L);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+        if (super.onDone(res, err)) {
+            cctx.mvcc().removeAtomicFuture(version());
+
+            if (err != null)
+                updateRes.addFailedKey(updateReq.singleKey(), err);
+
+            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+                completionCb.apply(updateReq, updateRes);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Sends requests to remote nodes.
+     */
+    public void map() {
+        if (!mappings.isEmpty()) {
+            for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+                try {
+                    if (log.isDebugEnabled())
+                        log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    U.warn(log, "Failed to send update request to backup node because it left grid: " +
+                        req.nodeId());
+
+                    mappings.remove(req.nodeId());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
+                        + req.nodeId(), e);
+
+                    mappings.remove(req.nodeId());
+                }
+            }
+        }
+
+        checkComplete();
+
+        // Send response right away if no ACKs from backup is required.
+        // Backups will send ACKs anyway, future will be completed after all backups have replied.
+        if (updateReq.writeSynchronizationMode() != FULL_SYNC)
+            completionCb.apply(updateReq, updateRes);
+    }
+
+    /**
+     * Callback for backup update response.
+     *
+     * @param nodeId Backup node ID.
+     * @param updateRes Update response.
+     */
+    public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
+        if (log.isDebugEnabled())
+            log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
+
+        if (updateRes.error() != null)
+            this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error());
+
+        if (!F.isEmpty(updateRes.nearEvicted())) {
+            assert nearReaderEntry != null;
+
+            try {
+                nearReaderEntry.removeReader(nodeId, updateRes.messageId());
+            }
+            catch (GridCacheEntryRemovedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry with evicted reader was removed [entry=" + nearReaderEntry + ", err=" + e + ']');
+            }
+        }
+
+        mappings.remove(nodeId);
+
+        checkComplete();
+    }
+
+    @Override
+    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+        assert false;
+
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Deferred update response.
+     *
+     * @param nodeId Backup node ID.
+     */
+    public void onResult(UUID nodeId) {
+        if (log.isDebugEnabled())
+            log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
+
+        mappings.remove(nodeId);
+
+        checkComplete();
+    }
+
+    /**
+     * Checks if all required responses are received.
+     */
+    private void checkComplete() {
+        // Always wait for replies from all backups.
+        if (mappings.isEmpty()) {
+            if (log.isDebugEnabled())
+                log.debug("Completing DHT atomic update future: " + this);
+
+            onDone();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAtomicSingleUpdateFuture.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4ace5c4..f0635e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -234,7 +234,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                         this.updateReq.subjectId(),
                         this.updateReq.taskNameHash(),
                         forceTransformBackups ? this.updateReq.invokeArguments() : null,
-                        cctx.deploymentEnabled());
+                        cctx.deploymentEnabled(),
+                        false);
 
                     mappings.put(nodeId, updateReq);
                 }
@@ -290,7 +291,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     this.updateReq.subjectId(),
                     this.updateReq.taskNameHash(),
                     forceTransformBackups ? this.updateReq.invokeArguments() : null,
-                    cctx.deploymentEnabled());
+                    cctx.deploymentEnabled(),
+                    false);
 
                 mappings.put(nodeId, updateReq);
             }
@@ -394,6 +396,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         checkComplete();
     }
 
+    @Override
+    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+        assert false;
+
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Deferred update response.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index e55cac9..242c373 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -78,6 +78,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
+    private KeyCacheObject singleKey;
+
+    private CacheObject singleVal;
+
     /** Conflict versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> conflictVers;
@@ -172,7 +176,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         UUID subjId,
         int taskNameHash,
         Object[] invokeArgs,
-        boolean addDepInfo
+        boolean addDepInfo,
+        boolean single
     ) {
         assert invokeArgs == null || forceTransformBackups;
 
@@ -188,16 +193,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         this.invokeArgs = invokeArgs;
         this.addDepInfo = addDepInfo;
 
-        keys = new ArrayList<>();
+        if (!single) {
+            keys = new ArrayList<>();
 
-        if (forceTransformBackups) {
-            entryProcessors = new ArrayList<>();
-            entryProcessorsBytes = new ArrayList<>();
+            if (forceTransformBackups) {
+                entryProcessors = new ArrayList<>();
+                entryProcessorsBytes = new ArrayList<>();
+            }
+            else
+                vals = new ArrayList<>();
         }
-        else
-            vals = new ArrayList<>();
     }
 
+
+
     /**
      * @return Force transform backups flag.
      */
@@ -205,6 +214,36 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return forceTransformBackups;
     }
 
+    public KeyCacheObject singleKey() {
+        assert singleKey != null;
+
+        return singleKey;
+    }
+
+    public CacheObject singleValue() {
+        return singleVal;
+    }
+
+    public boolean singleUpdate() {
+        return singleKey != null;
+    }
+
+    public void addSingleWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        long ttl) {
+        assert !forceTransformBackups;
+
+        singleKey = key;
+        singleVal = val;
+
+        if (ttl >= 0) {
+            if (ttls == null)
+                ttls = new GridLongList(1);
+
+            ttls.add(ttl);
+        }
+    }
+
     /**
      * @param key Key to add.
      * @param val Value, {@code null} if should be removed.
@@ -276,10 +315,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param expireTime Expire time.
      */
     public void addNearWriteValue(KeyCacheObject key,
-        @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
-        long ttl,
-        long expireTime)
+                                  @Nullable CacheObject val,
+                                  EntryProcessor<Object, Object, Object> entryProcessor,
+                                  long ttl,
+                                  long expireTime)
     {
         if (nearKeys == null) {
             nearKeys = new ArrayList<>();
@@ -540,24 +579,37 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        prepareMarshalCacheObjects(keys, cctx);
+        if (singleKey != null) {
+            assert !forceTransformBackups;
 
-        prepareMarshalCacheObjects(vals, cctx);
+            prepareMarshalCacheObject(singleKey, cctx);
 
-        prepareMarshalCacheObjects(nearKeys, cctx);
+            prepareMarshalCacheObject(singleVal, cctx);
 
-        prepareMarshalCacheObjects(nearVals, cctx);
+            prepareMarshalCacheObjects(nearKeys, cctx);
 
-        if (forceTransformBackups) {
-            // force addition of deployment info for entry processors if P2P is enabled globally.
-            if (!addDepInfo && ctx.deploymentEnabled())
-                addDepInfo = true;
+            prepareMarshalCacheObjects(nearVals, cctx);
+        }
+        else {
+            prepareMarshalCacheObjects(keys, cctx);
+
+            prepareMarshalCacheObjects(vals, cctx);
+
+            prepareMarshalCacheObjects(nearKeys, cctx);
+
+            prepareMarshalCacheObjects(nearVals, cctx);
 
-            invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+            if (forceTransformBackups) {
+                // force addition of deployment info for entry processors if P2P is enabled globally.
+                if (!addDepInfo && ctx.deploymentEnabled())
+                    addDepInfo = true;
+
+                invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
 
-            entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+                entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
 
-            nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
+                nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
+            }
         }
     }
 
@@ -567,22 +619,29 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        finishUnmarshalCacheObjects(keys, cctx, ldr);
+        if (singleKey != null) {
+            finishUnmarshalCacheObject(singleKey, cctx, ldr);
 
-        finishUnmarshalCacheObjects(vals, cctx, ldr);
+            finishUnmarshalCacheObject(singleVal, cctx, ldr);
+        }
+        else {
+            finishUnmarshalCacheObjects(keys, cctx, ldr);
 
-        finishUnmarshalCacheObjects(nearKeys, cctx, ldr);
+            finishUnmarshalCacheObjects(vals, cctx, ldr);
 
-        finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+            finishUnmarshalCacheObjects(nearKeys, cctx, ldr);
 
-        if (forceTransformBackups) {
-            entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+            finishUnmarshalCacheObjects(nearVals, cctx, ldr);
 
-            invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
-        }
+            if (forceTransformBackups) {
+                entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+                invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+            }
 
-        if (forceTransformBackups)
-            nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+            if (forceTransformBackups)
+                nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+        }
     }
 
     /** {@inheritDoc} */
@@ -684,42 +743,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeMessage("singleKey", singleKey))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeMessage("singleVal", singleVal))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("ttls", ttls))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 22:
+                if (!writer.writeMessage("ttls", ttls))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -846,7 +917,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 16:
-                subjId = reader.readUuid("subjId");
+                singleKey = reader.readMessage("singleKey");
 
                 if (!reader.isLastRead())
                     return false;
@@ -854,6 +925,22 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 17:
+                singleVal = reader.readMessage("singleVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -865,7 +952,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 18:
+            case 20:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -873,7 +960,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 19:
+            case 21:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -881,7 +968,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 20:
+            case 22:
                 ttls = reader.readMessage("ttls");
 
                 if (!reader.isLastRead())
@@ -889,7 +976,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 21:
+            case 23:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -897,7 +984,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 22:
+            case 24:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -917,7 +1004,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 23;
+        return 25;
     }
 
     /** {@inheritDoc} */


Mime
View raw message