ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: Restore clock mode.
Date Wed, 22 Mar 2017 13:31:05 GMT
Restore clock mode.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0015962a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0015962a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0015962a

Branch: refs/heads/ignite-2.0-clock
Commit: 0015962a43cc48d5b7857503a0f84a43518cbd7f
Parents: 827befb
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Mar 22 15:20:14 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Mar 22 16:30:49 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   12 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |   62 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  544 +++++++---
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |    6 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |    6 +-
 ...idNearAtomicAbstractSingleUpdateRequest.java |    4 +
 .../GridNearAtomicAbstractUpdateFuture.java     |   72 +-
 .../GridNearAtomicAbstractUpdateRequest.java    |  120 +-
 .../GridNearAtomicFastMapUpdateFuture.java      | 1026 ++++++++++++++++++
 .../atomic/GridNearAtomicFullUpdateRequest.java |   48 +-
 ...GridNearAtomicSingleUpdateFilterRequest.java |   10 +-
 .../GridNearAtomicSingleUpdateFuture.java       |   23 +-
 ...GridNearAtomicSingleUpdateInvokeRequest.java |   17 +-
 .../GridNearAtomicSingleUpdateRequest.java      |   25 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   29 +-
 .../distributed/near/GridNearAtomicCache.java   |    5 +-
 .../GridCacheAtomicMessageCountSelfTest.java    |   14 +
 .../atomic/IgniteCacheAtomicProtocolTest.java   |    2 +
 18 files changed, 1745 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 54b4ed7..2237e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2170,6 +2170,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                     assert conflictCtx != null;
 
+                    boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
                     // Use old value?
                     if (conflictCtx.isUseOld()) {
                         GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
@@ -2178,7 +2180,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         if (!isNew() &&                                                                       // Not initial value,
                             verCheck &&                                                                       // and atomic version check,
                             oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&                 // and data centers are equal,
-                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, true) == 0 && // and both versions are equal,
+                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
                             cctx.writeThrough() &&                                                            // and store is enabled,
                             primary)                                                                          // and we are primary.
                         {
@@ -2224,11 +2226,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     conflictVer = null;
             }
 
+            boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
             // Perform version check only in case there was no explicit conflict resolution.
             if (conflictCtx == null) {
                 if (verCheck) {
-                    if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) >= 0) {
-                        if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) == 0 && cctx.writeThrough() && primary) {
+                    if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
+                        if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
                             if (log.isDebugEnabled())
                                 log.debug("Received entry update with same version as current (will update store) " +
                                     "[entry=" + this + ", newVer=" + newVer + ']');
@@ -2303,7 +2307,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     }
                 }
                 else
-                    assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) <= 0 :
+                    assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
                         "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..219b04a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -85,6 +85,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** Update request. */
     final GridNearAtomicAbstractUpdateRequest updateReq;
 
+    /** Update response. */
+    @GridToStringExclude
+    private final GridNearAtomicUpdateResponse updateRes;
+
+    /** Completion callback. */
+    @GridToStringExclude
+    private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
+
     /** Mappings. */
     @GridToStringExclude
     protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
@@ -99,16 +107,22 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param cctx Cache context.
      * @param writeVer Write version.
      * @param updateReq Update request.
+     * @param updateRes Response.
+     * @param completionCb Callback to invoke to send response to near node.
      */
     protected GridDhtAtomicAbstractUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
-    ) {
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb)
+    {
         this.cctx = cctx;
 
-        this.updateReq = updateReq;
         this.writeVer = writeVer;
+        this.updateReq = updateReq;
+        this.updateRes = updateRes;
+        this.completionCb = completionCb;
 
         futId = cctx.mvcc().atomicFutureId();
 
@@ -354,13 +368,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      *
      * @param nearNode Near node.
      * @param ret Cache operation return value.
-     * @param updateRes Response.
-     * @param completionCb Callback to invoke to send response to near node.
      */
-    final void map(ClusterNode nearNode,
-        GridCacheReturn ret,
-        GridNearAtomicUpdateResponse updateRes,
-        GridDhtAtomicCache.UpdateReplyClosure completionCb) {
+    final void map(ClusterNode nearNode, GridCacheReturn ret) {
         if (F.isEmpty(mappings)) {
             updateRes.dhtNodes(Collections.<UUID>emptyList());
 
@@ -371,23 +380,27 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
             return;
         }
 
-        boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
-            !ret.emptyResult() ||
-            updateRes.nearVersion() != null ||
-            cctx.localNodeId().equals(nearNode.id());
+        if (!updateReq.fastMap()) {
+            boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
+                !ret.emptyResult() ||
+                updateRes.nearVersion() != null ||
+                cctx.localNodeId().equals(nearNode.id());
 
-        boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
+            boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
 
-        if (needMapping) {
-            initMapping(updateRes);
+            if (needMapping) {
+                initMapping(updateRes);
 
-            needReplyToNear = true;
-        }
+                needReplyToNear = true;
+            }
 
-        sendDhtRequests(nearNode, ret);
+            sendDhtRequests(nearNode, ret);
 
-        if (needReplyToNear)
-            completionCb.apply(updateReq, updateRes);
+            if (needReplyToNear)
+                completionCb.apply(updateReq, updateRes);
+        }
+        else
+            sendDhtRequests(nearNode, ret);
     }
 
     /**
@@ -416,14 +429,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
             try {
                 assert !cctx.localNodeId().equals(req.nodeId()) : req;
 
-                if (updateReq.fullSync()) {
+                if (!updateReq.fastMap() && updateReq.fullSync()) {
                     req.nearReplyInfo(nearNode.id(), updateReq.futureId());
 
                     if (ret.emptyResult())
                         req.hasResult(true);
                 }
 
-                if (cntQryClsrs != null)
+                if (cntQryClsrs != null || updateReq.fastMap())
                     req.replyWithoutDelay(true);
 
                 cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
@@ -519,6 +532,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     clsr.apply(suc);
             }
 
+            if (updateReq.fastMap())
+                completionCb.apply(updateReq, updateRes);
+
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c20ed48..3735824 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -106,7 +107,9 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -1115,27 +1118,54 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
-        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
-            ctx,
-            this,
-            ctx.config().getWriteSynchronizationMode(),
-            op,
-            map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
-                conflictPutMap.keySet() : conflictRmvMap.keySet(),
-            map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
-            invokeArgs,
-            (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
-            conflictRmvMap != null ? conflictRmvMap.values() : null,
-            retval,
-            rawRetval,
-            opCtx != null ? opCtx.expiry() : null,
-            CU.filterArray(null),
-            subjId,
-            taskNameHash,
-            opCtx != null && opCtx.skipStore(),
-            opCtx != null && opCtx.isKeepBinary(),
-            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-            waitTopFut);
+        final GridNearAtomicAbstractUpdateFuture updateFut;
+
+        if (isFastMap(null, op)) {
+            updateFut = new GridNearAtomicFastMapUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                op,
+                map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
+                    conflictPutMap.keySet() : conflictRmvMap.keySet(),
+                map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
+                invokeArgs,
+                (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
+                conflictRmvMap != null ? conflictRmvMap.values() : null,
+                retval,
+                rawRetval,
+                opCtx != null ? opCtx.expiry() : null,
+                CU.filterArray(null),
+                subjId,
+                taskNameHash,
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                waitTopFut);
+        }
+        else {
+            updateFut = new GridNearAtomicUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                op,
+                map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
+                    conflictPutMap.keySet() : conflictRmvMap.keySet(),
+                map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
+                invokeArgs,
+                (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
+                conflictRmvMap != null ? conflictRmvMap.values() : null,
+                retval,
+                rawRetval,
+                opCtx != null ? opCtx.expiry() : null,
+                CU.filterArray(null),
+                subjId,
+                taskNameHash,
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                waitTopFut);
+        }
 
         if (async) {
             return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1306,7 +1336,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         CacheEntryPredicate[] filters = CU.filterArray(filter);
 
-        if (conflictPutVal == null && conflictRmvVer == null) {
+        if (conflictPutVal == null &&
+            conflictRmvVer == null &&
+            !isFastMap(filters, op)) {
             return new GridNearAtomicSingleUpdateFuture(
                 ctx,
                 this,
@@ -1328,30 +1360,67 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             );
         }
         else {
-            return new GridNearAtomicUpdateFuture(
-                ctx,
-                this,
-                ctx.config().getWriteSynchronizationMode(),
-                op,
-                Collections.singletonList(key),
-                val0 != null ? Collections.singletonList(val0) : null,
-                invokeArgs,
-                conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
-                conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
-                retval,
-                false,
-                opCtx != null ? opCtx.expiry() : null,
-                filters,
-                ctx.subjectIdPerCall(null, opCtx),
-                ctx.kernalContext().job().currentTaskNameHash(),
-                opCtx != null && opCtx.skipStore(),
-                opCtx != null && opCtx.isKeepBinary(),
-                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-                waitTopFut);
+            if (isFastMap(filters, op)) {
+                return new GridNearAtomicFastMapUpdateFuture(
+                    ctx,
+                    this,
+                    ctx.config().getWriteSynchronizationMode(),
+                    op,
+                    Collections.singletonList(key),
+                    val0 != null ? Collections.singletonList(val0) : null,
+                    invokeArgs,
+                    conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
+                    conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
+                    retval,
+                    false,
+                    opCtx != null ? opCtx.expiry() : null,
+                    filters,
+                    ctx.subjectIdPerCall(null, opCtx),
+                    ctx.kernalContext().job().currentTaskNameHash(),
+                    opCtx != null && opCtx.skipStore(),
+                    opCtx != null && opCtx.isKeepBinary(),
+                    opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                    waitTopFut);
+            }
+            else {
+                return new GridNearAtomicUpdateFuture(
+                    ctx,
+                    this,
+                    ctx.config().getWriteSynchronizationMode(),
+                    op,
+                    Collections.singletonList(key),
+                    val0 != null ? Collections.singletonList(val0) : null,
+                    invokeArgs,
+                    conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
+                    conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
+                    retval,
+                    false,
+                    opCtx != null ? opCtx.expiry() : null,
+                    filters,
+                    ctx.subjectIdPerCall(null, opCtx),
+                    ctx.kernalContext().job().currentTaskNameHash(),
+                    opCtx != null && opCtx.skipStore(),
+                    opCtx != null && opCtx.isKeepBinary(),
+                    opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                    waitTopFut);
+            }
         }
     }
 
     /**
+     * Whether this is fast-map operation.
+     *
+     * @param filters Filters.
+     * @param op Operation.
+     * @return {@code True} if fast-map.
+     */
+    public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) {
+        return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+            ctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
+    }
+
+    /**
      * Entry point for all public API remove methods.
      *
      * @param keys Keys to remove.
@@ -1394,26 +1463,52 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             });
         }
 
-        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
-            ctx,
-            this,
-            ctx.config().getWriteSynchronizationMode(),
-            DELETE,
-            keys != null ? keys : conflictMap.keySet(),
-            null,
-            null,
-            null,
-            drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
-            retval,
-            rawRetval,
-            opCtx != null ? opCtx.expiry() : null,
-            CU.filterArray(null),
-            subjId,
-            taskNameHash,
-            opCtx != null && opCtx.skipStore(),
-            opCtx != null && opCtx.isKeepBinary(),
-            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-            true);
+        final GridNearAtomicAbstractUpdateFuture updateFut;
+
+        if (isFastMap(null, DELETE)) {
+            updateFut = new GridNearAtomicFastMapUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                DELETE,
+                keys != null ? keys : conflictMap.keySet(),
+                null,
+                null,
+                null,
+                drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
+                retval,
+                rawRetval,
+                opCtx != null ? opCtx.expiry() : null,
+                CU.filterArray(null),
+                subjId,
+                taskNameHash,
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                true);
+        }
+        else {
+            updateFut = new GridNearAtomicUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                DELETE,
+                keys != null ? keys : conflictMap.keySet(),
+                null,
+                null,
+                null,
+                drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
+                retval,
+                rawRetval,
+                opCtx != null ? opCtx.expiry() : null,
+                CU.filterArray(null),
+                subjId,
+                taskNameHash,
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                true);
+        }
 
         if (async) {
             return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1773,29 +1868,36 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         return;
                     }
 
-                    // Do not check topology version if topology was locked on near node by
+                    // Do not check topology version for CLOCK versioning since
+                    // partition exchange will wait for near update future (if future is on server node).
+                    // Also do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
                         locked = lockEntries(req, req.topologyVersion());
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
-                        // Assign next version for update inside entries lock.
-                        GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+                        GridCacheVersion ver = req.updateVersion();
+
+                        if (ver == null) {
+                            // Assign next version for update inside entries lock.
+                            ver = ctx.versions().next(top.topologyVersion());
 
-                        if (hasNear)
-                            res.nearVersion(ver);
+                            if (hasNear)
+                                res.nearVersion(ver);
 
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("Assigned update version [futId=" + req.futureId() +
-                                ", writeVer=" + ver + ']');
+                            if (msgLog.isDebugEnabled()) {
+                                msgLog.debug("Assigned update version [futId=" + req.futureId() +
+                                    ", writeVer=" + ver + ']');
+                            }
                         }
 
                         assert ver != null : "Got null version for update request: " + req;
 
                         boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
 
-                        dhtFut = createDhtFuture(ver, req);
+                        dhtFut = req.fastMap() ? null : createDhtFuture(ver, req, res, completionCb);
 
                         expiry = expiryPolicy(req.expiry());
 
@@ -1812,6 +1914,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 hasNear,
                                 req,
                                 res,
+                                completionCb,
                                 locked,
                                 ver,
                                 dhtFut,
@@ -1831,6 +1934,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 hasNear,
                                 req,
                                 res,
+                                completionCb,
                                 locked,
                                 ver,
                                 dhtFut,
@@ -1910,9 +2014,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             completionCb.apply(req, res);
         }
-        else
+        else {
             if (dhtFut != null)
-                dhtFut.map(node, res.returnValue(), res, completionCb);
+                dhtFut.map(node, res.returnValue());
+            else {
+                assert req.fastMap() : req;
+
+                completionCb.apply(req, res);
+            }
+        }
 
         if (req.writeSynchronizationMode() != FULL_ASYNC)
             req.cleanup(!node.isLocal());
@@ -1943,6 +2053,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final boolean hasNear,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         final List<GridDhtCacheEntry> locked,
         final GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
@@ -1990,6 +2101,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         for (int i = 0; i < locked.size(); i++) {
             GridDhtCacheEntry entry = locked.get(i);
 
+            if (entry == null)
+                continue;
+
             try {
                 if (!checkFilter(entry, req, res)) {
                     if (expiry != null && entry.hasValue()) {
@@ -2095,6 +2209,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 dhtFut,
                                 req,
                                 res,
+                                completionCb,
                                 replicate,
                                 updRes,
                                 taskName,
@@ -2143,6 +2258,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 dhtFut,
                                 req,
                                 res,
+                                completionCb,
                                 replicate,
                                 updRes,
                                 taskName,
@@ -2269,6 +2385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 dhtFut,
                 req,
                 res,
+                completionCb,
                 replicate,
                 updRes,
                 taskName,
@@ -2358,6 +2475,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean hasNear,
         GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
@@ -2373,6 +2491,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
 
+        boolean readersOnly = false;
+
         boolean intercept = ctx.config().getInterceptor() != null;
 
         AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
@@ -2388,12 +2508,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             try {
                 GridDhtCacheEntry entry = locked.get(i);
 
+                if (entry == null)
+                    continue;
+
                 GridCacheVersion newConflictVer = req.conflictVersion(i);
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
 
                 assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
 
+                boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
+                    req.topologyVersion());
+
                 Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
 
                 Collection<UUID> readers = null;
@@ -2411,18 +2537,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     op,
                     writeVal,
                     req.invokeArguments(),
-                    writeThrough() && !req.skipStore(),
+                    (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
+                        && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
                     sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
                     /*event*/true,
                     /*metrics*/true,
-                    /*primary*/true,
-                    /*verCheck*/false,
+                    primary,
+                    ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
                     topVer,
                     req.filter(),
-                    replicate ? DR_PRIMARY : DR_NONE,
+                    replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
                     newConflictTtl,
                     newConflictExpireTime,
                     newConflictVer,
@@ -2434,6 +2561,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     /*updateCntr*/null,
                     dhtFut);
 
+                if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+                    dhtFut = createDhtFuture(ver, req, res, completionCb);
+
+                    readersOnly = true;
+                }
+
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2445,17 +2578,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         EntryProcessor<Object, Object, Object> entryProcessor = null;
 
-                        dhtFut.addWriteEntry(
-                            affAssignment,
-                            entry,
-                            updRes.newValue(),
-                            entryProcessor,
-                            updRes.newTtl(),
-                            updRes.conflictExpireTime(),
-                            newConflictVer,
-                            sndPrevVal,
-                            updRes.oldValue(),
-                            updRes.updateCounter());
+                        if (!readersOnly) {
+                            dhtFut.addWriteEntry(
+                                affAssignment,
+                                entry,
+                                updRes.newValue(),
+                                entryProcessor,
+                                updRes.newTtl(),
+                                updRes.conflictExpireTime(),
+                                newConflictVer,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateCounter());
+                        }
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(
@@ -2474,7 +2609,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (hasNear) {
-                    if (updRes.sendToDht()) {
+                    if (primary && updRes.sendToDht()) {
                         if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                             // If put the same value as in request then do not need to send it back.
                             if (op == TRANSFORM || writeVal != updRes.newValue()) {
@@ -2580,6 +2715,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         final boolean replicate,
         final UpdateBatchResult batchRes,
         final String taskName,
@@ -2600,8 +2736,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             GridCacheOperation op;
 
             if (putMap != null) {
+                // If fast mapping, filter primary keys for write to store.
+                Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
+                    F.view(putMap, new P1<CacheObject>() {
+                        @Override public boolean apply(CacheObject key) {
+                            return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
+                        }
+                    }) :
+                    putMap;
+
                 try {
-                    ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+                    ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
                         @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
                             return F.t(v, ver);
                         }
@@ -2614,8 +2759,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 op = UPDATE;
             }
             else {
+                // If fast mapping, filter primary keys for write to store.
+                Collection<KeyCacheObject> storeKeys = req.fastMap() ?
+                    F.view(rmvKeys, new P1<Object>() {
+                        @Override public boolean apply(Object key) {
+                            return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
+                        }
+                    }) :
+                    rmvKeys;
+
                 try {
-                    ctx.store().removeAll(null, rmvKeys);
+                    ctx.store().removeAll(null, storeKeys);
                 }
                 catch (CacheStorePartialUpdateException e) {
                     storeErr = e;
@@ -2650,6 +2804,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     assert writeVal != null || op == DELETE : "null write value found.";
 
+                    boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(),
+                        entry.partition(),
+                        req.topologyVersion());
+
                     Collection<UUID> readers = null;
                     Collection<UUID> filteredReaders = null;
 
@@ -2672,11 +2830,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         expiry,
                         /*event*/true,
                         /*metrics*/true,
-                        /*primary*/true,
-                        /*verCheck*/false,
+                        primary,
+                        ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
                         topVer,
                         null,
-                        replicate ? DR_PRIMARY : DR_NONE,
+                        replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
                         CU.TTL_NOT_CHANGED,
                         CU.EXPIRE_TIME_CALCULATE,
                         null,
@@ -2710,23 +2868,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     batchRes.addDeleted(entry, updRes, entries);
 
+                    if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+                        dhtFut = createDhtFuture(ver, req, res, completionCb);
+
+                        batchRes.readersOnly(true);
+                    }
+
                     if (dhtFut != null) {
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
-                        dhtFut.addWriteEntry(
-                            affAssignment,
-                            entry,
-                            writeVal,
-                            entryProcessor,
-                            updRes.newTtl(),
-                            CU.EXPIRE_TIME_CALCULATE,
-                            null,
-                            sndPrevVal,
-                            updRes.oldValue(),
-                            updRes.updateCounter());
+                        if (!batchRes.readersOnly()) {
+                            dhtFut.addWriteEntry(
+                                affAssignment,
+                                entry,
+                                writeVal,
+                                entryProcessor,
+                                updRes.newTtl(),
+                                CU.EXPIRE_TIME_CALCULATE,
+                                null,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateCounter());
+                        }
 
-                        if (!F.isEmpty(filteredReaders))
+                        if (!F.isEmpty(filteredReaders)) {
                             dhtFut.addNearWriteEntries(
                                 filteredReaders,
                                 entry,
@@ -2734,29 +2900,34 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
+                        }
                     }
 
                     if (hasNear) {
-                        if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
-                            int idx = firstEntryIdx + i;
-
-                            if (req.operation() == TRANSFORM) {
-                                res.addNearValue(idx,
-                                    writeVal,
-                                    updRes.newTtl(),
-                                    CU.EXPIRE_TIME_CALCULATE);
-                            }
-                            else
-                                res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+                        if (primary) {
+                            if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
+                                int idx = firstEntryIdx + i;
+
+                                if (req.operation() == TRANSFORM) {
+                                    res.addNearValue(idx,
+                                        writeVal,
+                                        updRes.newTtl(),
+                                        CU.EXPIRE_TIME_CALCULATE);
+                                }
+                                else
+                                    res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
 
-                            if (writeVal != null || entry.hasValue()) {
-                                IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+                                if (writeVal != null || entry.hasValue()) {
+                                    IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
 
-                                assert f == null : f;
+                                    assert f == null : f;
+                                }
                             }
+                            else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+                                entry.removeReader(nearNode.id(), req.messageId());
+                            else
+                                res.addSkippedIndex(firstEntryIdx + i);
                         }
-                        else if (readers.contains(nearNode.id())) // Reader became primary or backup.
-                            entry.removeReader(nearNode.id(), req.messageId());
                         else
                             res.addSkippedIndex(firstEntryIdx + i);
                     }
@@ -2800,14 +2971,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             KeyCacheObject key = req.key(0);
 
             while (true) {
-                GridDhtCacheEntry entry = entryExx(key, topVer);
+                try {
+                    GridDhtCacheEntry entry = entryExx(key, topVer);
 
-                GridUnsafe.monitorEnter(entry);
+                    GridUnsafe.monitorEnter(entry);
 
-                if (entry.obsolete())
-                    GridUnsafe.monitorExit(entry);
-                else
-                    return Collections.singletonList(entry);
+                    if (entry.obsolete())
+                        GridUnsafe.monitorExit(entry);
+                    else
+                        return Collections.singletonList(entry);
+                }
+                catch (GridDhtInvalidPartitionException e) {
+                    // Ignore invalid partition exception in CLOCK ordering mode.
+                    if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                        return Collections.singletonList(null);
+                    else
+                        throw e;
+                }
             }
         }
         else {
@@ -2815,9 +2995,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             while (true) {
                 for (int i = 0; i < req.size(); i++) {
-                    GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+                    try {
+                        GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
 
-                    locked.add(entry);
+                        locked.add(entry);
+                    }
+                    catch (GridDhtInvalidPartitionException e) {
+                        // Ignore invalid partition exception in CLOCK ordering mode.
+                        if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                            locked.add(null);
+                        else
+                            throw e;
+                    }
                 }
 
                 boolean retry = false;
@@ -2970,28 +3159,54 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             drPutVals = null;
         }
 
-        GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
-            ctx,
-            this,
-            ctx.config().getWriteSynchronizationMode(),
-            req.operation(),
-            req.keys(),
-            vals,
-            req.invokeArguments(),
-            drPutVals,
-            drRmvVals,
-            req.returnValue(),
-            false,
-            req.expiry(),
-            req.filter(),
-            req.subjectId(),
-            req.taskNameHash(),
-            req.skipStore(),
-            req.keepBinary(),
-            MAX_RETRIES,
-            true);
+        if (isFastMap(req.filter(), req.operation())) {
+            GridNearAtomicFastMapUpdateFuture updateFut = new GridNearAtomicFastMapUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                req.operation(),
+                req.keys(),
+                vals,
+                req.invokeArguments(),
+                drPutVals,
+                drRmvVals,
+                req.returnValue(),
+                false,
+                req.expiry(),
+                req.filter(),
+                req.subjectId(),
+                req.taskNameHash(),
+                req.skipStore(),
+                req.keepBinary(),
+                MAX_RETRIES,
+                true);
 
-        updateFut.map();
+            updateFut.map();
+        }
+        else {
+            GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                req.operation(),
+                req.keys(),
+                vals,
+                req.invokeArguments(),
+                drPutVals,
+                drRmvVals,
+                req.returnValue(),
+                false,
+                req.expiry(),
+                req.filter(),
+                req.subjectId(),
+                req.taskNameHash(),
+                req.skipStore(),
+                req.keepBinary(),
+                MAX_RETRIES,
+                true);
+
+            updateFut.map();
+        }
     }
 
     /**
@@ -3003,12 +3218,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
         if (updateReq.size() == 1)
-            return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
+            return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
         else
-            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
     }
 
     /**
@@ -3520,6 +3737,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private GridDhtAtomicAbstractUpdateFuture dhtFut;
 
         /** */
+        private boolean readersOnly;
+
+        /** */
         private GridCacheReturn invokeRes;
 
         /**
@@ -3572,6 +3792,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
             this.dhtFut = dhtFut;
         }
+
+        /**
+         * @return {@code True} if only readers (not backups) should be updated.
+         */
+        private boolean readersOnly() {
+            return readersOnly;
+        }
+
+        /**
+         * @param readersOnly {@code True} if only readers (not backups) should be updated.
+         */
+        private void readersOnly(boolean readersOnly) {
+            this.readersOnly = readersOnly;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 8ebe9c3..8ad6496 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -50,9 +50,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     GridDhtAtomicSingleUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
-        super(cctx, writeVer, updateReq);
+        super(cctx, writeVer, updateReq, updateRes, completionCb);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5d5ddf0..6de08c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -49,9 +49,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb
     ) {
-        super(cctx, writeVer, updateReq);
+        super(cctx, writeVer, updateReq, updateRes, completionCb);
 
         mappings = U.newHashMap(updateReq.size());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 6811236..10f368e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -73,6 +73,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean fastMap,
+        boolean clientReq,
         boolean mappingKnown,
         boolean skipStore,
         boolean keepBinary,
@@ -88,6 +90,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
             retval,
             subjId,
             taskNameHash,
+            fastMap,
+            clientReq,
             mappingKnown,
             skipStore,
             keepBinary,

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..59f3e76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
@@ -140,6 +142,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     @GridToStringInclude
     protected Long futId;
 
+    /** */
+    protected GridCacheVersion updVer;
+
     /** Operation result. */
     protected GridCacheReturn opRes;
 
@@ -208,12 +213,32 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     }
 
     /** {@inheritDoc} */
-    @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
         return null;
     }
 
     /**
      * @param req Request.
+     * @param topVer Topology version.
+     */
+    final void initUpdateVersion(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) {
+        // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+            GridCacheVersion updVer = this.updVer;
+
+            if (updVer == null) {
+                this.updVer = updVer = cctx.versions().next(topVer);
+
+                if (log.isDebugEnabled())
+                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+            }
+
+            req.updateVersion(updVer);
+        }
+    }
+
+    /**
+     * @param req Request.
      */
     void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
         try {
@@ -241,14 +266,29 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             // Cannot remap.
             remapCnt = 1;
 
-            map(topVer);
+            if (fastMap()) {
+                Long futId = addAtomicFuture(topVer);
+
+                if (futId != null)
+                    map(topVer, futId);
+            }
+            else
+                map(topVer, futId);
         }
     }
 
     /**
+     * @return {@code True} for fast map update mode.
+     */
+    protected boolean fastMap() {
+        return false;
+    }
+
+    /**
      * @param topVer Topology version.
+     * @param futId Future ID.
      */
-    protected abstract void map(AffinityTopologyVersion topVer);
+    protected abstract void map(AffinityTopologyVersion topVer, Long futId);
 
     /**
      * Maps future on ready topology.
@@ -274,7 +314,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
      */
     final boolean storeFuture() {
-        return syncMode != FULL_ASYNC;
+        return syncMode != FULL_ASYNC || cctx.config().getAtomicWriteOrderMode() == CLOCK;
     }
 
     /**
@@ -416,6 +456,30 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     }
 
     /**
+     * Adds future prevents topology change before operation complete.
+     * Should be invoked before topology lock released.
+     *
+     * @param topVer Topology version.
+     * @return Future version in case future added.
+     */
+    protected final Long addAtomicFuture(AffinityTopologyVersion topVer) {
+        Long futId = cctx.mvcc().atomicFutureId();
+
+        synchronized (mux) {
+            assert this.futId == null : this;
+            assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+            this.topVer = topVer;
+            this.futId = futId;
+        }
+
+        if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
+            return null;
+
+        return futId;
+    }
+
+    /**
      *
      */
     static class PrimaryRequestState {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a43bfb0..bdf2678 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -48,7 +48,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
-    /** . */
+    /** */
     private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
 
     /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
@@ -63,6 +63,15 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     /** Return value flag. */
     private static final int RET_VAL_FLAG_MASK = 0x10;
 
+    /** Fast map update flag. */
+    private static final int FAST_MAP_FLAG_MASK = 0x20;
+
+    /** Client node request flag. */
+    private static final int CLIENT_REQ_FLAG_MASK = 0x40;
+
+    /** Client node request flag. */
+    private static final int HAS_PRIMARY_FLAG_MASK = 0x80;
+
     /** Target node ID. */
     @GridDirectTransient
     protected UUID nodeId;
@@ -93,6 +102,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     @GridDirectTransient
     private GridNearAtomicUpdateResponse res;
 
+    /** Update version. Set to non-null if fastMap is {@code true}. */
+    private GridCacheVersion updateVer;
+
     /**
      *
      */
@@ -129,6 +141,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean fastMap,
+        boolean clientReq,
         boolean needPrimaryRes,
         boolean skipStore,
         boolean keepBinary,
@@ -144,16 +158,62 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         this.taskNameHash = taskNameHash;
         this.addDepInfo = addDepInfo;
 
+        if (fastMap)
+            setFlag(true, FAST_MAP_FLAG_MASK);
+        if (clientReq)
+            setFlag(true, CLIENT_REQ_FLAG_MASK);
         if (needPrimaryRes)
-            needPrimaryResponse(true);
+            setFlag(true, NEED_PRIMARY_RES_FLAG_MASK);
         if (topLocked)
-            topologyLocked(true);
+            setFlag(true, TOP_LOCKED_FLAG_MASK);
         if (retval)
-            returnValue(true);
+            setFlag(true, RET_VAL_FLAG_MASK);
         if (skipStore)
-            skipStore(true);
+            setFlag(true, SKIP_STORE_FLAG_MASK);
         if (keepBinary)
-            keepBinary(true);
+            setFlag(true, KEEP_BINARY_FLAG_MASK);
+    }
+
+    /**
+     * @return Flag indicating whether this request contains primary keys.
+     */
+    boolean hasPrimary() {
+        return isFlag(HAS_PRIMARY_FLAG_MASK);
+    }
+
+    /**
+     * @param val Flag indicating whether this request contains primary keys.
+     */
+    void hasPrimary(boolean val) {
+        setFlag(val, HAS_PRIMARY_FLAG_MASK);
+    }
+
+    /**
+     * @return Flag indicating whether this is fast-map update.
+     */
+    boolean fastMap() {
+        return isFlag(FAST_MAP_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if request sent from client node.
+     */
+    boolean clientRequest() {
+        return isFlag(CLIENT_REQ_FLAG_MASK);
+    }
+
+    /**
+     * @return Update version for fast-map request.
+     */
+    @Nullable public final GridCacheVersion updateVersion() {
+        return updateVer;
+    }
+
+    /**
+     * @param updateVer Update version for fast-map request.
+     */
+    final void updateVersion(GridCacheVersion updateVer) {
+        this.updateVer = updateVer;
     }
 
     /** {@inheritDoc} */
@@ -291,13 +351,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param val {@code True} if topology is locked on near node.
-     */
-    private void topologyLocked(boolean val) {
-        setFlag(val, TOP_LOCKED_FLAG_MASK);
-    }
-
-    /**
      * @return Return value flag.
      */
     public final boolean returnValue() {
@@ -305,13 +358,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param val Return value flag.
-     */
-    public final void returnValue(boolean val) {
-        setFlag(val, RET_VAL_FLAG_MASK);
-    }
-
-    /**
      * @return Skip write-through to a persistent storage.
      */
     public final boolean skipStore() {
@@ -319,13 +365,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param val Skip store flag.
-     */
-    public void skipStore(boolean val) {
-        setFlag(val, SKIP_STORE_FLAG_MASK);
-    }
-
-    /**
      * @return Keep binary flag.
      */
     public final boolean keepBinary() {
@@ -333,13 +372,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param val Keep binary flag.
-     */
-    public void keepBinary(boolean val) {
-        setFlag(val, KEEP_BINARY_FLAG_MASK);
-    }
-
-    /**
      * Sets flag mask.
      *
      * @param flag Set or clear.
@@ -380,12 +412,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @param conflictTtl Conflict TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param primary If given key is primary on this mapping.
      */
     abstract void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer);
+        @Nullable GridCacheVersion conflictVer,
+        boolean primary);
 
     /**
      * @return Keys for this update request.
@@ -458,7 +492,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 11;
     }
 
     /** {@inheritDoc} */
@@ -518,6 +552,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
 
                 writer.incrementState();
 
+            case 10:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -598,6 +638,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
 
                 reader.incrementState();
 
+            case 10:
+                updateVer = reader.readMessage("updateVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);


Mime
View raw message