ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Date Tue, 21 Jul 2015 23:56:16 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-104 [created] 4f14522ab


IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4f14522a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4f14522a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4f14522a

Branch: refs/heads/ignite-104
Commit: 4f14522ab7b92b38810fd24ec15bcb094f480d08
Parents: 73a2b14
Author: Valentin Kulichenko <vkulichenko@gridgain.com>
Authored: Tue Jul 21 16:55:32 2015 -0700
Committer: Valentin Kulichenko <vkulichenko@gridgain.com>
Committed: Tue Jul 21 16:55:32 2015 -0700

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  21 ++
 .../org/apache/ignite/internal/GridTopic.java   |  73 +++++++
 .../processors/cache/GridCacheIoManager.java    |  32 ++-
 .../processors/cache/GridCacheUtils.java        |  12 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  60 +++--
 .../GridDhtAtomicDeferredUpdateResponse.java    |  16 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 129 +++++++++--
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  15 +-
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |  17 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 219 ++++++++++++++-----
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  15 +-
 .../atomic/GridNearAtomicUpdateResponse.java    |  14 +-
 12 files changed, 519 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 3ad0f01..83847dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -219,6 +219,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Write ordering mode. */
     private CacheAtomicWriteOrderMode atomicWriteOrderMode;
 
+    /** Ordered updates mode. */
+    private boolean atomicOrderedUpdates;
+
     /** Number of backups for cache. */
     private int backups = DFLT_BACKUPS;
 
@@ -345,6 +348,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         aff = cc.getAffinity();
         affMapper = cc.getAffinityMapper();
         atomicityMode = cc.getAtomicityMode();
+        atomicOrderedUpdates = cc.isAtomicOrderedUpdates();
         atomicWriteOrderMode = cc.getAtomicWriteOrderMode();
         backups = cc.getBackups();
         cacheLoaderFactory = cc.getCacheLoaderFactory();
@@ -896,6 +900,23 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * @return Ordered updates mode.
+     */
+    public boolean isAtomicOrderedUpdates() {
+        return atomicOrderedUpdates;
+    }
+
+    /**
+     * @param atomicOrderedUpdates Ordered updates mode.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setAtomicOrderedUpdates(boolean atomicOrderedUpdates) {
+        this.atomicOrderedUpdates = atomicOrderedUpdates;
+
+        return this;
+    }
+
+    /**
      * Gets number of nodes used to back up single partition for {@link CacheMode#PARTITIONED} cache.
      * <p>
      * If not set, default value is {@link #DFLT_BACKUPS}.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index ba3b8b2..1ed8725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -183,6 +183,15 @@ public enum GridTopic {
     }
 
     /**
+     * @param id1 ID1.
+     * @param id2 ID2.
+     * @return Grid message topic with specified IDs.
+     */
+    public Object topic(int id1, int id2) {
+        return new T9(this, id1, id2);
+    }
+
+    /**
      *
      */
     private static class T1 implements Externalizable {
@@ -756,4 +765,68 @@ public enum GridTopic {
             return S.toString(T8.class, this);
         }
     }
+
+    /**
+     */
+    private static class T9 implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private GridTopic topic;
+
+        /** */
+        private int id1;
+
+        /** */
+        private int id2;
+
+        /**
+         * No-arg constructor needed for {@link Serializable}.
+         */
+        public T9() {
+            // No-op.
+        }
+
+        /**
+         * @param topic Topic.
+         * @param id1 ID1.
+         * @param id2 ID2.
+         */
+        private T9(GridTopic topic, int id1, int id2) {
+            this.topic = topic;
+            this.id1 = id1;
+            this.id2 = id2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return topic.ordinal() + 31 * id1 + 31 * id2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (obj.getClass() == T9.class) {
+                T9 that = (T9)obj;
+
+                return topic == that.topic && id1 == that.id1 && id2 == that.id2;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeByte(topic.ordinal());
+            out.writeInt(id1);
+            out.writeInt(id2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            topic = fromOrdinal(in.readByte());
+            id1 = in.readByte();
+            id2 = in.readByte();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 84e4dc2..dec6aef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -378,7 +378,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
                     ctx.cacheId(),
-                    req.futureVersion());
+                    req.futureVersion(),
+                    req.partition());
 
                 res.onError(req.classError());
 
@@ -393,7 +394,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion());
+                    req.futureVersion(),
+                    req.partition());
 
                 res.error(req.classError());
 
@@ -745,13 +747,32 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         ClusterNode n = cctx.discovery().node(nodeId);
 
         if (n == null)
-            throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + nodeId +
-                ", msg=" + msg + ']');
+            throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" +
+                nodeId + ", msg=" + msg + ']');
 
         send(n, msg, plc);
     }
 
     /**
+     * @param nodeId Destination node ID.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc IO policy.
+     * @param timeout Timeout to keep a message on receiving queue.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendOrderedMessage(UUID nodeId, Object topic, GridCacheMessage msg, byte plc, long timeout)
+        throws IgniteCheckedException {
+        ClusterNode n = cctx.discovery().node(nodeId);
+
+        if (n == null)
+            throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" +
+                nodeId + ", msg=" + msg + ']');
+
+        sendOrderedMessage(n, topic, msg, plc, timeout);
+    }
+
+    /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
      * @param msg Message to send.
@@ -779,7 +800,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             }
             catch (IgniteCheckedException e) {
                 if (cctx.discovery().node(node.id()) == null)
-                    throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e);
+                    throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " +
+                        node.id(), e);
 
                 if (cnt == retryCnt)
                     throw e;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e16e30d..b0edc3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -57,7 +57,6 @@ import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CacheRebalanceMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
 
 /**
@@ -1750,4 +1749,15 @@ public class GridCacheUtils {
             }
         };
     }
+
+    /**
+     * @param ctx Cache context.
+     * @param part Partition.
+     * @return Per-partition message topic.
+     */
+    public static Object partitionMassageTopic(GridCacheContext ctx, int part) {
+        assert part >= 0;
+
+        return TOPIC_CACHE.topic(ctx.cacheId(), part);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 0a21979..38073f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -181,11 +181,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
-            @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
-                processNearAtomicUpdateRequest(nodeId, req);
+        if (ctx.config().isAtomicOrderedUpdates()) {
+            for (int part = 0; part < ctx.affinity().partitions(); part++) {
+                ctx.io().addOrderedHandler(CU.partitionMassageTopic(ctx, part), new CI2<UUID, GridNearAtomicUpdateRequest>() {
+                    @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+                        processNearAtomicUpdateRequest(nodeId, req);
+                    }
+                });
             }
-        });
+        }
+        else {
+            ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+                @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+                    processNearAtomicUpdateRequest(nodeId, req);
+                }
+            });
+        }
 
         ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
             @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
@@ -193,11 +204,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
-            @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
-                processDhtAtomicUpdateRequest(nodeId, req);
+        if (ctx.config().isAtomicOrderedUpdates()) {
+            for (int part = 0; part < ctx.affinity().partitions(); part++) {
+                ctx.io().addOrderedHandler(CU.partitionMassageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+                    @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
+                        processDhtAtomicUpdateRequest(nodeId, req);
+                    }
+                });
             }
-        });
+        }
+        else {
+            ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+                @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
+                    processDhtAtomicUpdateRequest(nodeId, req);
+                }
+            });
+        }
 
         ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() {
             @Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) {
@@ -1017,7 +1039,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             nodeId,
-            req.futureVersion());
+            req.futureVersion(),
+            req.partition());
 
         List<KeyCacheObject> keys = req.keys();
 
@@ -2389,7 +2412,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridCacheVersion ver = req.writeVersion();
 
         // Always send update reply.
-        GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion());
+        GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
+            ctx.cacheId(), req.futureVersion(), req.partition());
 
         Boolean replicate = ctx.isDrEnabled();
 
@@ -2477,7 +2501,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 ctx.io().send(nodeId, res, ctx.ioPolicy());
             else {
                 // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
-                sendDeferredUpdateResponse(nodeId, req.futureVersion());
+                sendDeferredUpdateResponse(nodeId, req.futureVersion(), req.partition());
             }
         }
         catch (ClusterTopologyCheckedException ignored) {
@@ -2494,7 +2518,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nodeId Node ID to send message to.
      * @param ver Version to ack.
      */
-    private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
+    private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver, int part) {
         while (true) {
             DeferredResponseBuffer buf = pendingResponses.get(nodeId);
 
@@ -2511,7 +2535,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     buf = old;
             }
 
-            if (!buf.addResponse(ver))
+            if (!buf.addResponse(ver, part))
                 // Some thread is sending filled up buffer, we can remove it.
                 pendingResponses.remove(nodeId, buf);
             else
@@ -2551,7 +2575,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver);
 
             if (updateFut != null)
-                updateFut.onResult(nodeId);
+                updateFut.onResult(nodeId, res);
             else
                 U.warn(log, "Failed to find DHT update future for deferred update response [nodeId=" +
                     nodeId + ", ver=" + ver + ", res=" + res + ']');
@@ -2751,6 +2775,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /** Response versions. */
         private Collection<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>();
 
+        /** Response partitions. */
+        private Collection<Integer> respParts = new GridConcurrentHashSet<>();
+
         /** Node ID. */
         private final UUID nodeId;
 
@@ -2805,7 +2832,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
          * @param ver Version to send.
          * @return {@code True} if response was handled, {@code false} if this buffer is filled and cannot be used.
          */
-        public boolean addResponse(GridCacheVersion ver) {
+        public boolean addResponse(GridCacheVersion ver, int part) {
             readLock().lock();
 
             boolean snd = false;
@@ -2815,6 +2842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     return false;
 
                 respVers.add(ver);
+                respParts.add(part);
 
                 if  (respVers.size() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
                     snd = true;
@@ -2845,7 +2873,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
          */
         private void finish() {
             GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
-                respVers);
+                respVers, respParts);
 
             try {
                 ctx.kernalContext().gateway().readLock();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 1163761..e203b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -41,6 +41,10 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
     @GridDirectCollection(GridCacheVersion.class)
     private Collection<GridCacheVersion> futVers;
 
+    /** Partitions. */
+    @GridDirectCollection(int.class)
+    private Collection<Integer> parts;
+
     /** {@inheritDoc} */
     @Override public int lookupIndex() {
         return CACHE_MSG_IDX;
@@ -57,12 +61,15 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
      * Constructor.
      *
      * @param futVers Future versions.
+     * @param parts Partitions.
      */
-    public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers) {
+    public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers,
+        Collection<Integer> parts) {
         assert !F.isEmpty(futVers);
 
         this.cacheId = cacheId;
         this.futVers = futVers;
+        this.parts = parts;
     }
 
     /**
@@ -72,6 +79,13 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
         return futVers;
     }
 
+    /**
+     * @return Partitions.
+     */
+    public Collection<Integer> partitions() {
+        return parts;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4b1a58f..f7e574d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+    private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
 
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
@@ -135,7 +135,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+        return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
+            @Override public ClusterNode apply(MappingKey mappingKey) {
+                return cctx.kernalContext().discovery().node(mappingKey.nodeId);
+            }
+        }), F.notNull());
     }
 
     /** {@inheritDoc} */
@@ -143,11 +147,16 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         if (log.isDebugEnabled())
             log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
 
-        GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
+        Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
+
+        for (MappingKey mappingKey : mappings.keySet()) {
+            if (mappingKey.nodeId.equals(nodeId))
+                mappingKeys.add(mappingKey);
+        }
 
-        if (req != null) {
-            // Remove only after added keys to failed set.
-            mappings.remove(nodeId);
+        if (!mappingKeys.isEmpty()) {
+            for (MappingKey mappingKey : mappingKeys)
+                mappings.remove(mappingKey);
 
             checkComplete();
 
@@ -201,7 +210,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         @Nullable GridCacheVersion conflictVer) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-        Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+        int part = entry.partition();
+
+        Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer);
+
+        if (!cctx.config().isAtomicOrderedUpdates())
+            part = -1;
 
         if (log.isDebugEnabled())
             log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
@@ -213,8 +227,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         for (ClusterNode node : dhtNodes) {
             UUID nodeId = node.id();
 
+            MappingKey mappingKey = new MappingKey(nodeId, part);
+
             if (!nodeId.equals(cctx.localNodeId())) {
-                GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+                GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
 
                 if (updateReq == null) {
                     updateReq = new GridDhtAtomicUpdateRequest(
@@ -227,9 +243,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                         forceTransformBackups,
                         this.updateReq.subjectId(),
                         this.updateReq.taskNameHash(),
-                        forceTransformBackups ? this.updateReq.invokeArguments() : null);
+                        forceTransformBackups ? this.updateReq.invokeArguments() : null,
+                        part);
 
-                    mappings.put(nodeId, updateReq);
+                    mappings.put(mappingKey, updateReq);
                 }
 
                 updateReq.addWriteValue(entry.key(),
@@ -262,8 +279,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
+        int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
+
         for (UUID nodeId : readers) {
-            GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+            MappingKey mappingKey = new MappingKey(nodeId, part);
+
+            GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
 
             if (updateReq == null) {
                 ClusterNode node = cctx.discovery().node(nodeId);
@@ -282,9 +303,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     forceTransformBackups,
                     this.updateReq.subjectId(),
                     this.updateReq.taskNameHash(),
-                    forceTransformBackups ? this.updateReq.invokeArguments() : null);
+                    forceTransformBackups ? this.updateReq.invokeArguments() : null,
+                    part);
 
-                mappings.put(nodeId, updateReq);
+                mappings.put(mappingKey, updateReq);
             }
 
             if (nearReadersEntries == null)
@@ -319,24 +341,36 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      */
     public void map() {
         if (!mappings.isEmpty()) {
-            for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+            for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
+                MappingKey mappingKey = e.getKey();
+                GridDhtAtomicUpdateRequest req = e.getValue();
+
                 try {
                     if (log.isDebugEnabled())
                         log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                    if (mappingKey.part >= 0) {
+                        Object topic = CU.partitionMassageTopic(cctx, mappingKey.part);
+
+                        cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), 0);
+                    }
+                    else {
+                        assert mappingKey.part == -1;
+
+                        cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                    }
                 }
                 catch (ClusterTopologyCheckedException ignored) {
                     U.warn(log, "Failed to send update request to backup node because it left grid: " +
                         req.nodeId());
 
-                    mappings.remove(req.nodeId());
+                    mappings.remove(mappingKey);
                 }
-                catch (IgniteCheckedException e) {
+                catch (IgniteCheckedException ex) {
                     U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
-                        + req.nodeId(), e);
+                        + req.nodeId(), ex);
 
-                    mappings.remove(req.nodeId());
+                    mappings.remove(mappingKey);
                 }
             }
         }
@@ -376,7 +410,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             }
         }
 
-        mappings.remove(nodeId);
+        mappings.remove(new MappingKey(nodeId, updateRes.partition()));
 
         checkComplete();
     }
@@ -385,12 +419,14 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * Deferred update response.
      *
      * @param nodeId Backup node ID.
+     * @param res Response.
      */
-    public void onResult(UUID nodeId) {
+    public void onResult(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
         if (log.isDebugEnabled())
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
-        mappings.remove(nodeId);
+        for (Integer part : res.partitions())
+            mappings.remove(new MappingKey(nodeId, part));
 
         checkComplete();
     }
@@ -412,4 +448,53 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     @Override public String toString() {
         return S.toString(GridDhtAtomicUpdateFuture.class, this);
     }
+
+    /**
+     */
+    private static class MappingKey {
+        /** Node ID. */
+        private final UUID nodeId;
+
+        /** Partition. */
+        private final int part;
+
+        /**
+         * @param nodeId Node ID.
+         * @param part Partition.
+         */
+        private MappingKey(UUID nodeId, int part) {
+            assert nodeId != null;
+            assert part >= -1 : part;
+
+            this.nodeId = nodeId;
+            this.part = part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            MappingKey key = (MappingKey)o;
+
+            return nodeId.equals(key.nodeId) && part == key.part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+
+            res = 31 * res + part;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MappingKey.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index f83b8fa..031edb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -127,6 +127,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Task name hash. */
     private int taskNameHash;
 
+    /** Partition. */
+    private int part;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -147,6 +150,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param forceTransformBackups Force transform backups flag.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
+     * @param part Partition.
      */
     public GridDhtAtomicUpdateRequest(
         int cacheId,
@@ -158,7 +162,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         boolean forceTransformBackups,
         UUID subjId,
         int taskNameHash,
-        Object[] invokeArgs
+        Object[] invokeArgs,
+        int part
     ) {
         assert invokeArgs == null || forceTransformBackups;
 
@@ -172,6 +177,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.invokeArgs = invokeArgs;
+        this.part = part;
 
         keys = new ArrayList<>();
 
@@ -318,6 +324,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /**
+     * @return Partition.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
      * @return Node ID.
      */
     public UUID nodeId() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index c5b5a37..509a918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -59,6 +59,9 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
     @GridDirectCollection(KeyCacheObject.class)
     private List<KeyCacheObject> nearEvicted;
 
+    /** Partition. */
+    private int part;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -69,10 +72,12 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
     /**
      * @param cacheId Cache ID.
      * @param futVer Future version.
+     * @param part Partition.
      */
-    public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer) {
+    public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, int part) {
         this.cacheId = cacheId;
         this.futVer = futVer;
+        this.part = part;
     }
 
     /** {@inheritDoc} */
@@ -89,7 +94,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
     /**
      * Sets update error.
-     * @param err
+     *
+     * @param err Error.
      */
     public void onError(IgniteCheckedException err){
         this.err = err;
@@ -110,6 +116,13 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
     }
 
     /**
+     * @return Partition.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
      * Adds key to collection of failed keys.
      *
      * @param key Key to add.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 4c8a161..63818f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
+    private ConcurrentMap<MappingKey, GridNearAtomicUpdateRequest> mappings;
 
     /** Error. */
     private volatile CachePartialUpdateCheckedException err;
@@ -246,7 +246,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+        return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
+            @Override public ClusterNode apply(MappingKey mappingKey) {
+                return cctx.kernalContext().discovery().node(mappingKey.nodeId);
+            }
+        }), F.notNull());
     }
 
     /**
@@ -283,13 +287,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             return false;
         }
 
-        GridNearAtomicUpdateRequest req = mappings.get(nodeId);
+        Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
+        Collection<KeyCacheObject> failedKeys = new ArrayList<>();
+
+        for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+            if (e.getKey().nodeId.equals(nodeId)) {
+                mappingKeys.add(e.getKey());
 
-        if (req != null) {
-            addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
-                "received: " + nodeId));
+                failedKeys.addAll(e.getValue().keys());
+            }
+        }
 
-            mappings.remove(nodeId);
+        if (!mappingKeys.isEmpty()) {
+            if (!failedKeys.isEmpty())
+                addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " +
+                    "response is received: " + nodeId));
+
+            for (MappingKey key : mappingKeys)
+                mappings.remove(key);
 
             checkComplete();
 
@@ -529,7 +544,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
         }
         else {
-            GridNearAtomicUpdateRequest req = mappings.get(nodeId);
+            MappingKey mappingKey = new MappingKey(nodeId, res.partition());
+
+            GridNearAtomicUpdateRequest req = mappings.get(mappingKey);
 
             if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
                 updateNear(req, res);
@@ -547,7 +564,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                         opRes = ret;
                 }
 
-                mappings.remove(nodeId);
+                mappings.remove(mappingKey);
             }
 
             checkComplete();
@@ -763,7 +780,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             if (op != TRANSFORM)
                 val = cctx.toCacheObject(val);
 
-            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+            int part = cctx.affinity().partition(cacheKey);
+            ClusterNode primary = cctx.affinity().primary(part, topVer);
+
+            if (!ccfg.isAtomicOrderedUpdates())
+                part = -1;
 
             if (primary == null) {
                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
@@ -789,7 +810,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 subjId,
                 taskNameHash,
                 skipStore,
-                cctx.kernalContext().clientNode());
+                cctx.kernalContext().clientNode(),
+                part);
 
             req.addUpdateEntry(cacheKey,
                 val,
@@ -805,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
 
             // Optimize mapping for single key.
-            mapSingle(primary.id(), req);
+            mapSingle(new MappingKey(primary.id(), part), req);
 
             return;
         }
@@ -825,13 +847,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (conflictRmvVals != null)
             conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+        Map<MappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
 
         // Must do this in synchronized block because we need to atomically remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
-            if (oldNodeId != null)
-                removeMapping(oldNodeId);
+            if (oldNodeId != null) {
+                // TODO: IGNITE-104 - Try to avoid iteration.
+                for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+                    if (e.getKey().nodeId.equals(oldNodeId))
+                        mappings.remove(e.getKey());
+                }
+            }
 
             // For fastMap mode wait for all responses before remapping.
             if (remap && fastMap && !mappings.isEmpty()) {
@@ -901,7 +928,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (op != TRANSFORM)
                     val = cctx.toCacheObject(val);
 
-                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+                T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap);
+
+                int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
+                Collection<ClusterNode> affNodes = t.get2();
 
                 if (affNodes.isEmpty()) {
                     onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -922,7 +952,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                     UUID nodeId = affNode.id();
 
-                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+                    MappingKey mappingKey = new MappingKey(nodeId, part);
+
+                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey);
 
                     if (mapped == null) {
                         mapped = new GridNearAtomicUpdateRequest(
@@ -942,11 +974,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             subjId,
                             taskNameHash,
                             skipStore,
-                            cctx.kernalContext().clientNode());
+                            cctx.kernalContext().clientNode(),
+                            part);
 
-                        pendingMappings.put(nodeId, mapped);
+                        pendingMappings.put(mappingKey, mapped);
 
-                        GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
+                        GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped);
 
                         assert old == null || (old != null && remap) :
                             "Invalid mapping state [old=" + old + ", remap=" + remap + ']';
@@ -964,7 +997,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         if ((single == null || single) && pendingMappings.size() == 1) {
-            Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
+            Map.Entry<MappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
 
             single = true;
 
@@ -987,31 +1020,35 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
      * @return Collection of nodes to which key is mapped.
      */
-    private Collection<ClusterNode> mapKey(
+    private T2<Integer, Collection<ClusterNode>> mapKey(
         KeyCacheObject key,
         AffinityTopologyVersion topVer,
         boolean fastMap
     ) {
         GridCacheAffinityManager affMgr = cctx.affinity();
 
+        int part = affMgr.partition(key);
+
         // If we can send updates in parallel - do it.
-        return fastMap ?
-            cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primary(key, topVer));
+        Collection<ClusterNode> nodes = fastMap ?
+            cctx.topology().nodes(part, topVer) :
+            Collections.singletonList(affMgr.primary(part, topVer));
+
+        return new T2<>(part, nodes);
     }
 
     /**
      * Maps future to single node.
      *
-     * @param nodeId Node ID.
+     * @param mappingKey Mapping key.
      * @param req Request.
      */
-    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
-        singleNodeId = nodeId;
+    private void mapSingle(MappingKey mappingKey, GridNearAtomicUpdateRequest req) {
+        singleNodeId = mappingKey.nodeId;
         singleReq = req;
 
-        if (cctx.localNodeId().equals(nodeId)) {
-            cache.updateAllAsyncInternal(nodeId, req,
+        if (cctx.localNodeId().equals(mappingKey.nodeId)) {
+            cache.updateAllAsyncInternal(mappingKey.nodeId, req,
                 new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
                     @Override public void apply(GridNearAtomicUpdateRequest req,
                         GridNearAtomicUpdateResponse res) {
@@ -1026,7 +1063,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (log.isDebugEnabled())
                     log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                sendRequest(mappingKey, req);
 
                 if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
                     onDone(new GridCacheReturn(cctx, true, null, true));
@@ -1042,34 +1079,37 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      *
      * @param mappings Mappings to send.
      */
-    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+    private void doUpdate(Map<MappingKey, GridNearAtomicUpdateRequest> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
-        GridNearAtomicUpdateRequest locUpdate = null;
+        Collection<GridNearAtomicUpdateRequest> locUpdates = null;
 
         // Send messages to remote nodes first, then run local update.
-        for (GridNearAtomicUpdateRequest req : mappings.values()) {
+        for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+            MappingKey mappingKey = e.getKey();
+            GridNearAtomicUpdateRequest req = e.getValue();
+
             if (locNodeId.equals(req.nodeId())) {
-                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
-                    ", req=" + req + ']';
+                if (locUpdates == null)
+                    locUpdates = new ArrayList<>(mappings.size());
 
-                locUpdate = req;
+                locUpdates.add(req);
             }
             else {
                 try {
                     if (log.isDebugEnabled())
                         log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                    sendRequest(mappingKey, req);
                 }
-                catch (IgniteCheckedException e) {
-                    addFailedKeys(req.keys(), e);
+                catch (IgniteCheckedException ex) {
+                    addFailedKeys(req.keys(), ex);
 
-                    removeMapping(req.nodeId());
+                    removeMapping(mappingKey);
                 }
 
                 if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
-                    removeMapping(req.nodeId());
+                    removeMapping(mappingKey);
             }
         }
 
@@ -1077,28 +1117,50 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             // In FULL_ASYNC mode always return (null, true).
             opRes = new GridCacheReturn(cctx, true, null, true);
 
-        if (locUpdate != null) {
-            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req,
-                        GridNearAtomicUpdateResponse res) {
-                        assert res.futureVersion().equals(futVer) : futVer;
+        if (locUpdates != null) {
+            for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
+                cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+                    new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                        @Override public void apply(GridNearAtomicUpdateRequest req,
+                            GridNearAtomicUpdateResponse res) {
+                            assert res.futureVersion().equals(futVer) : futVer;
 
-                        onResult(res.nodeId(), res);
-                    }
-                });
+                            onResult(res.nodeId(), res);
+                        }
+                    });
+            }
         }
 
         checkComplete();
     }
 
     /**
+     * Sends request.
+     *
+     * @param mappingKey Mapping key.
+     * @param req Update request.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sendRequest(MappingKey mappingKey, GridNearAtomicUpdateRequest req) throws IgniteCheckedException {
+        if (mappingKey.part >= 0) {
+            Object topic = CU.partitionMassageTopic(cctx, mappingKey.part);
+
+            cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), 0);
+        }
+        else {
+            assert mappingKey.part == -1;
+
+            cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+        }
+    }
+
+    /**
      * Removes mapping from future mappings map.
      *
-     * @param nodeId Node ID to remove mapping for.
+     * @param mappingKey Mapping key.
      */
-    private void removeMapping(UUID nodeId) {
-        mappings.remove(nodeId);
+    private void removeMapping(MappingKey mappingKey) {
+        mappings.remove(mappingKey);
     }
 
     /**
@@ -1142,4 +1204,53 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     public String toString() {
         return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
     }
+
+    /**
+     */
+    private static class MappingKey {
+        /** Node ID. */
+        private final UUID nodeId;
+
+        /** Partition. */
+        private final int part;
+
+        /**
+         * @param nodeId Node ID.
+         * @param part Partition.
+         */
+        private MappingKey(UUID nodeId, int part) {
+            assert nodeId != null;
+            assert part >= -1 : part;
+
+            this.nodeId = nodeId;
+            this.part = part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            MappingKey key = (MappingKey)o;
+
+            return nodeId.equals(key.nodeId) && part == key.part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+
+            res = 31 * res + part;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MappingKey.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 86c5ab8..93429c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -135,6 +135,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     /** */
     private boolean clientReq;
 
+    /** Partition. */
+    private int part;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param clientReq Client node request flag.
+     * @param part Partition.
      */
     public GridNearAtomicUpdateRequest(
         int cacheId,
@@ -180,7 +184,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         @Nullable UUID subjId,
         int taskNameHash,
         boolean skipStore,
-        boolean clientReq
+        boolean clientReq,
+        int part
     ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
@@ -200,6 +205,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
         this.clientReq = clientReq;
+        this.part = part;
 
         keys = new ArrayList<>();
     }
@@ -315,6 +321,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     }
 
     /**
+     * @return Partition.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
      * @param key Key to add.
      * @param val Optional update value.
      * @param conflictTtl Conflict TTL (optional).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 330e43c..404670a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -92,6 +92,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Near expire times. */
     private GridLongList nearExpireTimes;
 
+    /** Partition. */
+    private int part;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -103,11 +106,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param cacheId Cache ID.
      * @param nodeId Node ID this reply should be sent to.
      * @param futVer Future version.
+     * @param part Partition.
      */
-    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futVer = futVer;
+        this.part = part;
     }
 
     /** {@inheritDoc} */
@@ -188,6 +193,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
+     * @return Partition.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
      * Adds value to be put in near cache on originating node.
      *
      * @param keyIdx Key index.


Mime
View raw message