ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [3/5] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Date Fri, 31 Jul 2015 00:26:10 GMT
IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2d16d99f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2d16d99f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2d16d99f

Branch: refs/heads/ignite-104
Commit: 2d16d99f64fdfbff591124abcb4c5d42ac29d8bf
Parents: dad4691
Author: Valentin Kulichenko <vkulichenko@gridgain.com>
Authored: Thu Jul 30 16:48:30 2015 -0700
Committer: Valentin Kulichenko <vkulichenko@gridgain.com>
Committed: Thu Jul 30 16:48:30 2015 -0700

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   2 +
 .../org/apache/ignite/internal/GridTopic.java   |  83 --------
 .../managers/communication/GridIoManager.java   | 199 ++++++++++++++++++-
 .../managers/communication/GridIoMessage.java   |  48 +++--
 .../processors/cache/GridCacheIoManager.java    |  99 +++++++--
 .../processors/cache/GridCacheUtils.java        |  11 -
 .../dht/atomic/GridAtomicMappingKey.java        |  86 ++++++++
 .../dht/atomic/GridAtomicRequestTopic.java      |  96 +++++++++
 .../dht/atomic/GridDhtAtomicCache.java          |  33 ++-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  89 ++-------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 172 +++++++++++-----
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  63 ++++--
 .../atomic/GridNearAtomicUpdateResponse.java    |  36 +++-
 .../preloader/GridDhtPartitionDemandPool.java   |  15 +-
 .../query/GridCacheDistributedQueryManager.java |   8 +-
 .../resources/META-INF/classnames.properties    |   2 +-
 16 files changed, 730 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index c560118..2510d65 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -142,6 +142,8 @@ public class MessageCodeGenerator {
 
         MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
 
+//        gen.generateAndWrite(GridIoMessage.class);
+
 //        gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
 //        gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
 //        gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index e9da40c..56aea1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -183,15 +183,6 @@ public enum GridTopic {
     }
 
     /**
-     * @param id1 ID1.
-     * @param id2 ID2.
-     * @return Grid message topic with specified IDs.
-     */
-    public Object topic(int id1, int id2) {
-        return new T9(this, id1, id2);
-    }
-
-    /**
      *
      */
     private static class T1 implements Externalizable {
@@ -765,78 +756,4 @@ public enum GridTopic {
             return S.toString(T8.class, this);
         }
     }
-
-    /**
-     */
-    private static class T9 implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private GridTopic topic;
-
-        /** */
-        private int id1;
-
-        /** */
-        private int id2;
-
-        /**
-         * No-arg constructor needed for {@link Serializable}.
-         */
-        public T9() {
-            // No-op.
-        }
-
-        /**
-         * @param topic Topic.
-         * @param id1 ID1.
-         * @param id2 ID2.
-         */
-        private T9(GridTopic topic, int id1, int id2) {
-            this.topic = topic;
-            this.id1 = id1;
-            this.id2 = id2;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = topic.ordinal();
-
-            res += 31 * res + id1;
-            res += 31 * res + id2;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            if (obj.getClass() == T9.class) {
-                T9 that = (T9)obj;
-
-                return topic == that.topic && id1 == that.id1 && id2 == that.id2;
-            }
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeByte(topic.ordinal());
-            out.writeInt(id1);
-            out.writeInt(id2);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            topic = fromOrdinal(in.readByte());
-            id1 = in.readInt();
-            id2 = in.readInt();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(T9.class, this);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c1fb79a..765ba65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -100,6 +100,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap =
         new ConcurrentHashMap8<>();
 
+    /** */
+    private final ConcurrentMap<Object, SequentialMessageSet> seqMsgs = new ConcurrentHashMap8<>();
+
     /** Local node ID. */
     private final UUID locNodeId;
 
@@ -576,6 +579,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 {
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);
+                    else if (msg.isSequential())
+                        processSequentialMessage(nodeId, msg, plc, msgC);
                     else
                         processRegularMessage(nodeId, msg, plc, msgC);
 
@@ -591,6 +596,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);
+                    else if (msg.isSequential())
+                        processSequentialMessage(nodeId, msg, plc, msgC);
                     else
                         processRegularMessage(nodeId, msg, plc, msgC);
             }
@@ -963,6 +970,78 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     * @param plc Execution policy.
+     * @param msgC Closure to call when message processing finished.
+     */
+    private void processSequentialMessage(
+        final UUID nodeId,
+        final GridIoMessage msg,
+        byte plc,
+        final IgniteRunnable msgC
+    ) throws IgniteCheckedException {
+        final GridMessageListener lsnr = lsnrMap.get(msg.topic());
+
+        if (lsnr == null) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring message because listener is not found: " + msg);
+
+            if (msgC != null)
+                msgC.run();
+
+            return;
+        }
+
+        SequentialMessageSet msgSet = seqMsgs.get(msg.topic());
+
+        if (msgSet == null) {
+            SequentialMessageSet old = seqMsgs.putIfAbsent(msg.topic(), msgSet = new SequentialMessageSet());
+
+            if (old != null)
+                msgSet = old;
+        }
+
+        msgSet.add(nodeId, msg, msgC);
+
+        if (msgC == null) {
+            assert locNodeId.equals(nodeId);
+
+            msgSet.unwind(lsnr);
+        }
+        else {
+            assert !locNodeId.equals(nodeId);
+
+            final SequentialMessageSet msgSet0 = msgSet;
+
+            Runnable c = new Runnable() {
+                @Override public void run() {
+                    try {
+                        threadProcessingMessage(true);
+
+                        msgSet0.unwind(lsnr);
+                    }
+                    finally {
+                        threadProcessingMessage(false);
+                    }
+                }
+            };
+
+            try {
+                pool(plc).execute(c);
+            }
+            catch (RejectedExecutionException e) {
+                U.error(log, "Failed to process sequential message due to execution rejection. " +
+                    "Increase the upper bound on executor service provided by corresponding " +
+                    "configuration property. Will attempt to process message in the listener " +
+                    "thread instead [msgPlc=" + plc + ']', e);
+
+                c.run();
+            }
+        }
+    }
+
+    /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
      * @param topicOrd GridTopic enumeration ordinal.
@@ -980,6 +1059,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc,
         boolean ordered,
+        boolean seq,
         long timeout,
         boolean skipOnTimeout
     ) throws IgniteCheckedException {
@@ -987,7 +1067,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         assert topic != null;
         assert msg != null;
 
-        GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
+        GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, seq, timeout, skipOnTimeout);
 
         if (locNodeId.equals(node.id())) {
             assert plc != P2P_POOL;
@@ -999,6 +1079,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
+            else if (seq)
+                processSequentialMessage(locNodeId, ioMsg, plc, null);
             else
                 processRegularMessage0(ioMsg, locNodeId);
         }
@@ -1050,7 +1132,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
     }
 
     /**
@@ -1062,7 +1144,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false);
+        send(node, topic, -1, msg, plc, false, false, 0, false);
     }
 
     /**
@@ -1074,7 +1156,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
     }
 
     /**
@@ -1096,7 +1178,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+        send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
     }
 
     /**
@@ -1123,7 +1205,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+        send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
     }
 
     /**
@@ -1146,7 +1228,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
+        send(nodes, topic, -1, msg, plc, true, false, timeout, skipOnTimeout);
     }
 
     /**
@@ -1162,7 +1244,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc
     ) throws IgniteCheckedException {
-        send(nodes, topic, -1, msg, plc, false, 0, false);
+        send(nodes, topic, -1, msg, plc, false, false, 0, false);
     }
 
     /**
@@ -1178,7 +1260,48 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc
     ) throws IgniteCheckedException {
-        send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
+        send(nodes, topic, topic.ordinal(), msg, plc, false, false, 0, false);
+    }
+
+    /**
+     * Sends sequential message.
+     *
+     * @param nodeId Destination node ID.
+     * @param topic Topic.
+     * @param msg Message.
+     * @param plc Policy.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void sendSequentialMessage(
+        UUID nodeId,
+        Object topic,
+        Message msg,
+        byte plc
+    ) throws IgniteCheckedException {
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null)
+            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+
+        sendSequentialMessage(node, topic, msg, plc);
+    }
+
+    /**
+     * Sends sequential message.
+     *
+     * @param node Destination node.
+     * @param topic Topic.
+     * @param msg Message.
+     * @param plc Policy.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void sendSequentialMessage(
+        ClusterNode node,
+        Object topic,
+        Message msg,
+        byte plc
+    ) throws IgniteCheckedException {
+        send(node, topic, -1, msg, plc, false, true, 0, false);
     }
 
     /**
@@ -1307,6 +1430,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Message to send.
      * @param plc Type of processing.
      * @param ordered Ordered flag.
+     * @param seq Sequential message flag.
      * @param timeout Message timeout.
      * @param skipOnTimeout Whether message can be skipped in timeout.
      * @throws IgniteCheckedException Thrown in case of any errors.
@@ -1318,6 +1442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc,
         boolean ordered,
+        boolean seq,
         long timeout,
         boolean skipOnTimeout
     ) throws IgniteCheckedException {
@@ -1334,7 +1459,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
                 for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout);
+                    send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
@@ -2216,4 +2341,58 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return S.toString(DelayedMessage.class, this, super.toString());
         }
     }
+
+    /**
+     */
+    private static class SequentialMessageSet {
+        /** */
+        private final Queue<GridTuple3<UUID, GridIoMessage, IgniteRunnable>> queue = new ConcurrentLinkedDeque8<>();
+
+        /** */
+        private final AtomicBoolean reserve = new AtomicBoolean();
+
+        /**
+         * @param nodeId Node ID.
+         * @param msg Message.
+         * @param msgC Closure to call when message processing finished.
+         */
+        void add(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
+            queue.add(F.t(nodeId, msg, msgC));
+        }
+
+        /**
+         * @param lsnr Message listener.
+         */
+        void unwind(GridMessageListener lsnr) {
+            assert lsnr != null;
+
+            while (true) {
+                if (reserve.compareAndSet(false, true)) {
+                    try {
+                        GridTuple3<UUID, GridIoMessage, IgniteRunnable> t;
+
+                        while ((t = queue.poll()) != null) {
+                            try {
+                                lsnr.onMessage(t.get1(), t.get2().message());
+                            }
+                            finally {
+                                IgniteRunnable msgC = t.get3();
+
+                                if (msgC != null)
+                                    msgC.run();
+                            }
+                        }
+                    }
+                    finally {
+                        reserve.set(false);
+                    }
+
+                    if (queue.isEmpty())
+                        return;
+                }
+                else
+                    return;
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 6cf1ae5..d729f75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -49,6 +49,9 @@ public class GridIoMessage implements Message {
     /** Message ordered flag. */
     private boolean ordered;
 
+    /** Sequential message flag. */
+    private boolean seq;
+
     /** Message timeout. */
     private long timeout;
 
@@ -72,6 +75,7 @@ public class GridIoMessage implements Message {
      * @param topicOrd Topic ordinal value.
      * @param msg Message.
      * @param ordered Message ordered flag.
+     * @param seq Sequential message flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
      */
@@ -81,18 +85,21 @@ public class GridIoMessage implements Message {
         int topicOrd,
         Message msg,
         boolean ordered,
+        boolean seq,
         long timeout,
         boolean skipOnTimeout
     ) {
         assert topic != null;
         assert topicOrd <= Byte.MAX_VALUE;
         assert msg != null;
+        assert !ordered || !seq; // Message can't be ordered and sequential at the same time.
 
         this.plc = plc;
         this.msg = msg;
         this.topic = topic;
         this.topicOrd = topicOrd;
         this.ordered = ordered;
+        this.seq = seq;
         this.timeout = timeout;
         this.skipOnTimeout = skipOnTimeout;
     }
@@ -167,6 +174,13 @@ public class GridIoMessage implements Message {
         return ordered;
     }
 
+    /**
+     * @return Sequential message flag.
+     */
+    boolean isSequential() {
+        return seq;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object obj) {
         throw new AssertionError();
@@ -208,24 +222,30 @@ public class GridIoMessage implements Message {
                 writer.incrementState();
 
             case 3:
-                if (!writer.writeBoolean("skipOnTimeout", skipOnTimeout))
+                if (!writer.writeBoolean("seq", seq))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeLong("timeout", timeout))
+                if (!writer.writeBoolean("skipOnTimeout", skipOnTimeout))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeByteArray("topicBytes", topicBytes))
+                if (!writer.writeLong("timeout", timeout))
                     return false;
 
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeByteArray("topicBytes", topicBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeInt("topicOrd", topicOrd))
                     return false;
 
@@ -261,19 +281,15 @@ public class GridIoMessage implements Message {
                 reader.incrementState();
 
             case 2:
-                byte plc0;
-
-                plc0 = reader.readByte("plc");
+                plc = reader.readByte("plc");
 
                 if (!reader.isLastRead())
                     return false;
 
-                plc = plc0;
-
                 reader.incrementState();
 
             case 3:
-                skipOnTimeout = reader.readBoolean("skipOnTimeout");
+                seq = reader.readBoolean("seq");
 
                 if (!reader.isLastRead())
                     return false;
@@ -281,7 +297,7 @@ public class GridIoMessage implements Message {
                 reader.incrementState();
 
             case 4:
-                timeout = reader.readLong("timeout");
+                skipOnTimeout = reader.readBoolean("skipOnTimeout");
 
                 if (!reader.isLastRead())
                     return false;
@@ -289,7 +305,7 @@ public class GridIoMessage implements Message {
                 reader.incrementState();
 
             case 5:
-                topicBytes = reader.readByteArray("topicBytes");
+                timeout = reader.readLong("timeout");
 
                 if (!reader.isLastRead())
                     return false;
@@ -297,6 +313,14 @@ public class GridIoMessage implements Message {
                 reader.incrementState();
 
             case 6:
+                topicBytes = reader.readByteArray("topicBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 topicOrd = reader.readInt("topicOrd");
 
                 if (!reader.isLastRead())
@@ -316,7 +340,7 @@ public class GridIoMessage implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5858424..490a5d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -62,8 +62,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
         clsHandlers = new ConcurrentHashMap8<>();
 
-    /** Ordered handler registry. */
-    private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers =
+    /** Per topic handler registry. */
+    private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> perTopicHandlers =
         new ConcurrentHashMap8<>();
 
     /** Stopping flag. */
@@ -173,7 +173,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridIO().removeMessageListener(TOPIC_CACHE);
 
-        for (Object ordTopic : orderedHandlers.keySet())
+        for (Object ordTopic : perTopicHandlers.keySet())
             cctx.gridIO().removeMessageListener(ordTopic);
 
         boolean interrupted = false;
@@ -394,7 +394,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion());
+                    req.futureVersion(),
+                    req.partition());
 
                 res.error(req.classError());
 
@@ -813,6 +814,64 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param nodeId Destination node ID.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc IO policy.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendSequentialMessage(UUID nodeId, Object topic, GridCacheMessage msg, byte plc)
+        throws IgniteCheckedException {
+        ClusterNode n = cctx.discovery().node(nodeId);
+
+        if (n == null)
+            throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" +
+                nodeId + ", msg=" + msg + ']');
+
+        sendSequentialMessage(n, topic, msg, plc);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc IO policy.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendSequentialMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc)
+        throws IgniteCheckedException {
+        onSend(msg, node.id());
+
+        int cnt = 0;
+
+        while (cnt <= retryCnt) {
+            try {
+                cnt++;
+
+                cctx.gridIO().sendSequentialMessage(node, topic, msg, plc);
+
+                if (log.isDebugEnabled())
+                    log.debug("Sent sequential cache message [topic=" + topic + ", msg=" + msg +
+                        ", nodeId=" + node.id() + ']');
+
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                if (cctx.discovery().node(node.id()) == null)
+                    throw new ClusterTopologyCheckedException("Node left grid while sending sequential message [" +
+                        "nodeId=" + node.id() + ", msg=" + msg + ']', e);
+
+                if (cnt == retryCnt)
+                    throw e;
+                else if (log.isDebugEnabled())
+                    log.debug("Failed to send message to node (will retry): " + node.id());
+            }
+
+            U.sleep(retryDelay);
+        }
+    }
+
+    /**
      * @return ID that auto-grows based on local counter and counters received
      *      from other nodes.
      */
@@ -940,39 +999,39 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Adds ordered message handler.
+     * Adds per topic message handler.
      *
      * @param topic Topic.
      * @param c Handler.
      */
     @SuppressWarnings({"unchecked"})
-    public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
-        if (orderedHandlers.putIfAbsent(topic, c) == null) {
-            cctx.gridIO().addMessageListener(topic, new OrderedMessageListener(
+    public void addPerTopicHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        if (perTopicHandlers.putIfAbsent(topic, c) == null) {
+            cctx.gridIO().addMessageListener(topic, new PerTopicMessageListener(
                 (IgniteBiInClosure<UUID, GridCacheMessage>)c));
 
             if (log != null && log.isDebugEnabled())
-                log.debug("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
+                log.debug("Registered per topic cache communication handler [topic=" + topic + ", handler=" + c + ']');
         }
         else if (log != null)
-            U.warn(log, "Failed to register ordered cache communication handler because it is already " +
+            U.warn(log, "Failed to register per topic cache communication handler because it is already " +
                 "registered for this topic [topic=" + topic + ", handler=" + c + ']');
     }
 
     /**
-     * Removed ordered message handler.
+     * Removed per topic message handler.
      *
      * @param topic Topic.
      */
-    public void removeOrderedHandler(Object topic) {
-        if (orderedHandlers.remove(topic) != null) {
+    public void removePerTopicHandler(Object topic) {
+        if (perTopicHandlers.remove(topic) != null) {
             cctx.gridIO().removeMessageListener(topic);
 
             if (log != null && log.isDebugEnabled())
-                log.debug("Unregistered ordered cache communication handler for topic:" + topic);
+                log.debug("Unregistered per topic cache communication handler for topic:" + topic);
         }
         else if (log != null)
-            U.warn(log, "Failed to unregister ordered cache communication handler because it was not found " +
+            U.warn(log, "Failed to unregister per topic cache communication handler because it was not found " +
                 "for topic: " + topic);
     }
 
@@ -1019,20 +1078,20 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         X.println(">>> ");
         X.println(">>> Cache IO manager memory stats [grid=" + cctx.gridName() + ']');
         X.println(">>>   clsHandlersSize: " + clsHandlers.size());
-        X.println(">>>   orderedHandlersSize: " + orderedHandlers.size());
+        X.println(">>>   perTopicHandlersSize: " + perTopicHandlers.size());
     }
 
     /**
-     * Ordered message listener.
+     * Per topic message listener.
      */
-    private class OrderedMessageListener implements GridMessageListener {
+    private class PerTopicMessageListener implements GridMessageListener {
         /** */
         private final IgniteBiInClosure<UUID, GridCacheMessage> c;
 
         /**
          * @param c Handler closure.
          */
-        OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
+        PerTopicMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
             this.c = c;
         }
 
@@ -1040,7 +1099,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         @SuppressWarnings({"CatchGenericClass", "unchecked"})
         @Override public void onMessage(final UUID nodeId, Object msg) {
             if (log.isDebugEnabled())
-                log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
+                log.debug("Received per topic cache message [nodeId=" + nodeId + ", msg=" + msg + ']');
 
             final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d82acca..a313e3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1733,15 +1733,4 @@ public class GridCacheUtils {
             }
         };
     }
-
-    /**
-     * @param ctx Cache context.
-     * @param part Partition.
-     * @return Per-partition message topic.
-     */
-    public static Object partitionMessageTopic(GridCacheContext ctx, int part) {
-        assert part >= 0;
-
-        return TOPIC_CACHE.topic(ctx.cacheId(), part);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
new file mode 100644
index 0000000..52e3c7f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Mapping Key.
+ */
+class GridAtomicMappingKey {
+    /** Node ID. */
+    private final UUID nodeId;
+
+    /** Partition. */
+    private final int part;
+
+    /**
+     * @param nodeId Node ID.
+     * @param part Partition.
+     */
+    GridAtomicMappingKey(UUID nodeId, int part) {
+        assert nodeId != null;
+        assert part >= -1 : part;
+
+        this.nodeId = nodeId;
+        this.part = part;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Partition.
+     */
+    int partition() {
+        return part;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        GridAtomicMappingKey key = (GridAtomicMappingKey)o;
+
+        return nodeId.equals(key.nodeId) && part == key.part;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = nodeId.hashCode();
+
+        res = 31 * res + part;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridAtomicMappingKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
new file mode 100644
index 0000000..9feb409
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ */
+class GridAtomicRequestTopic implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int cacheId;
+
+    /** */
+    private int part;
+
+    /** */
+    private boolean near;
+
+    /**
+     * For {@link Externalizable}.
+     */
+    public GridAtomicRequestTopic() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param part Partition.
+     * @param near Near flag.
+     */
+    GridAtomicRequestTopic(int cacheId, int part, boolean near) {
+        this.cacheId = cacheId;
+        this.part = part;
+        this.near = near;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        GridAtomicRequestTopic topic = (GridAtomicRequestTopic)o;
+
+        return cacheId == topic.cacheId && part == topic.part && near == topic.near;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = cacheId;
+
+        res = 31 * res + part;
+        res = 31 * res + (near ? 1 : 0);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cacheId);
+        out.writeInt(part);
+        out.writeBoolean(near);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        cacheId = in.readInt();
+        part = in.readInt();
+        near = in.readBoolean();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridAtomicRequestTopic.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index be35d00..a010baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -181,15 +181,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
-            @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
-                processNearAtomicUpdateRequest(nodeId, req);
-            }
-        });
-
         if (ctx.config().isAtomicOrderedUpdates()) {
             for (int part = 0; part < ctx.affinity().partitions(); part++) {
-                ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+                Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true);
+
+                ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+                    @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+                        processNearAtomicUpdateRequest(nodeId, req);
+                    }
+                });
+
+                Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false);
+
+                ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                     @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
                         processDhtAtomicUpdateRequest(nodeId, req);
                     }
@@ -197,6 +201,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         else {
+            ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+                @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+                    processNearAtomicUpdateRequest(nodeId, req);
+                }
+            });
+
             ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
                     processDhtAtomicUpdateRequest(nodeId, req);
@@ -238,8 +248,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             buf.finish();
 
         if (ctx.config().isAtomicOrderedUpdates()) {
-            for (int part = 0; part < ctx.affinity().partitions(); part++)
-                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part));
+            for (int part = 0; part < ctx.affinity().partitions(); part++) {
+                ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true));
+                ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false));
+            }
         }
     }
 
@@ -1033,7 +1045,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             nodeId,
-            req.futureVersion());
+            req.futureVersion(),
+            req.partition());
 
         List<KeyCacheObject> keys = req.keys();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 93c20da..c05f4c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+    private ConcurrentMap<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
 
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
@@ -142,8 +142,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
-            @Override public ClusterNode apply(MappingKey mappingKey) {
+        return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
+            @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
                 return cctx.kernalContext().discovery().node(mappingKey.nodeId());
             }
         }), F.notNull());
@@ -154,15 +154,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         if (log.isDebugEnabled())
             log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
 
-        Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
+        Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
 
-        for (MappingKey mappingKey : mappings.keySet()) {
+        for (GridAtomicMappingKey mappingKey : mappings.keySet()) {
             if (mappingKey.nodeId().equals(nodeId))
                 mappingKeys.add(mappingKey);
         }
 
         if (!mappingKeys.isEmpty()) {
-            for (MappingKey mappingKey : mappingKeys)
+            for (GridAtomicMappingKey mappingKey : mappingKeys)
                 mappings.remove(mappingKey);
 
             checkComplete();
@@ -234,7 +234,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         for (ClusterNode node : dhtNodes) {
             UUID nodeId = node.id();
 
-            MappingKey mappingKey = new MappingKey(nodeId, part);
+            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
 
             if (!nodeId.equals(cctx.localNodeId())) {
                 GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
@@ -287,7 +287,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
 
         for (UUID nodeId : readers) {
-            MappingKey mappingKey = new MappingKey(nodeId, part);
+            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
 
             GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
 
@@ -345,8 +345,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      */
     public void map() {
         if (!mappings.isEmpty()) {
-            for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
-                MappingKey mappingKey = e.getKey();
+            for (Map.Entry<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
+                GridAtomicMappingKey mappingKey = e.getKey();
                 GridDhtAtomicUpdateRequest req = e.getValue();
 
                 UUID nodeId = mappingKey.nodeId();
@@ -359,7 +359,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                         log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
 
                     if (part >= 0) {
-                        Object topic = CU.partitionMessageTopic(cctx, part);
+                        Object topic = new GridAtomicRequestTopic(cctx.cacheId(), part, false);
 
                         cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(),
                             2 * cctx.gridConfig().getNetworkTimeout());
@@ -429,7 +429,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             }
         }
 
-        mappings.remove(new MappingKey(nodeId, updateRes.partition()));
+        mappings.remove(new GridAtomicMappingKey(nodeId, updateRes.partition()));
 
         checkComplete();
     }
@@ -445,7 +445,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
         for (Integer part : res.partitions())
-            mappings.remove(new MappingKey(nodeId, part));
+            mappings.remove(new GridAtomicMappingKey(nodeId, part));
 
         checkComplete();
     }
@@ -468,67 +468,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         return S.toString(GridDhtAtomicUpdateFuture.class, this);
     }
 
-    /**
-     * Mapping Key.
-     */
-    private static class MappingKey {
-        /** Node ID. */
-        private final UUID nodeId;
-
-        /** Partition. */
-        private final int part;
-
-        /**
-         * @param nodeId Node ID.
-         * @param part Partition.
-         */
-        MappingKey(UUID nodeId, int part) {
-            assert nodeId != null;
-            assert part >= -1 : part;
-
-            this.nodeId = nodeId;
-            this.part = part;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        UUID nodeId() {
-            return nodeId;
-        }
-
-        /**
-         * @return Partition.
-         */
-        int partition() {
-            return part;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            MappingKey key = (MappingKey)o;
-
-            return nodeId.equals(key.nodeId) && part == key.part;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + part;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MappingKey.class, this);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 4c8a161..4642b1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
+    private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings;
 
     /** Error. */
     private volatile CachePartialUpdateCheckedException err;
@@ -246,7 +246,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+        return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
+            @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
+                return cctx.kernalContext().discovery().node(mappingKey.nodeId());
+            }
+        }), F.notNull());
     }
 
     /**
@@ -283,13 +287,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             return false;
         }
 
-        GridNearAtomicUpdateRequest req = mappings.get(nodeId);
+        Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
+        Collection<KeyCacheObject> failedKeys = new ArrayList<>();
+
+        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+            if (e.getKey().nodeId().equals(nodeId)) {
+                mappingKeys.add(e.getKey());
+
+                failedKeys.addAll(e.getValue().keys());
+            }
+        }
 
-        if (req != null) {
-            addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
-                "received: " + nodeId));
+        if (!mappingKeys.isEmpty()) {
+            if (!failedKeys.isEmpty())
+                addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " +
+                    "response is received: " + nodeId));
 
-            mappings.remove(nodeId);
+            for (GridAtomicMappingKey key : mappingKeys)
+                mappings.remove(key);
 
             checkComplete();
 
@@ -529,7 +544,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
         }
         else {
-            GridNearAtomicUpdateRequest req = mappings.get(nodeId);
+            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, res.partition());
+
+            GridNearAtomicUpdateRequest req = mappings.get(mappingKey);
 
             if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
                 updateNear(req, res);
@@ -547,7 +564,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                         opRes = ret;
                 }
 
-                mappings.remove(nodeId);
+                mappings.remove(mappingKey);
             }
 
             checkComplete();
@@ -763,7 +780,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             if (op != TRANSFORM)
                 val = cctx.toCacheObject(val);
 
-            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+            int part = cctx.affinity().partition(cacheKey);
+            ClusterNode primary = cctx.affinity().primary(part, topVer);
+
+            if (!ccfg.isAtomicOrderedUpdates())
+                part = -1;
 
             if (primary == null) {
                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
@@ -789,7 +810,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 subjId,
                 taskNameHash,
                 skipStore,
-                cctx.kernalContext().clientNode());
+                cctx.kernalContext().clientNode(),
+                part);
 
             req.addUpdateEntry(cacheKey,
                 val,
@@ -805,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
 
             // Optimize mapping for single key.
-            mapSingle(primary.id(), req);
+            mapSingle(new GridAtomicMappingKey(primary.id(), part), req);
 
             return;
         }
@@ -825,13 +847,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (conflictRmvVals != null)
             conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+        Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
 
         // Must do this in synchronized block because we need to atomically remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
-            if (oldNodeId != null)
-                removeMapping(oldNodeId);
+            if (oldNodeId != null) {
+                // TODO: IGNITE-104 - Try to avoid iteration.
+                for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+                    if (e.getKey().nodeId().equals(oldNodeId))
+                        mappings.remove(e.getKey());
+                }
+            }
 
             // For fastMap mode wait for all responses before remapping.
             if (remap && fastMap && !mappings.isEmpty()) {
@@ -901,7 +928,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (op != TRANSFORM)
                     val = cctx.toCacheObject(val);
 
-                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+                T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap);
+
+                int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
+                Collection<ClusterNode> affNodes = t.get2();
 
                 if (affNodes.isEmpty()) {
                     onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -922,7 +952,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                     UUID nodeId = affNode.id();
 
-                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+                    GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
+
+                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey);
 
                     if (mapped == null) {
                         mapped = new GridNearAtomicUpdateRequest(
@@ -942,11 +974,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             subjId,
                             taskNameHash,
                             skipStore,
-                            cctx.kernalContext().clientNode());
+                            cctx.kernalContext().clientNode(),
+                            part);
 
-                        pendingMappings.put(nodeId, mapped);
+                        pendingMappings.put(mappingKey, mapped);
 
-                        GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
+                        GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped);
 
                         assert old == null || (old != null && remap) :
                             "Invalid mapping state [old=" + old + ", remap=" + remap + ']';
@@ -964,7 +997,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         if ((single == null || single) && pendingMappings.size() == 1) {
-            Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
+            Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
 
             single = true;
 
@@ -987,31 +1020,35 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
      * @return Collection of nodes to which key is mapped.
      */
-    private Collection<ClusterNode> mapKey(
+    private T2<Integer, Collection<ClusterNode>> mapKey(
         KeyCacheObject key,
         AffinityTopologyVersion topVer,
         boolean fastMap
     ) {
         GridCacheAffinityManager affMgr = cctx.affinity();
 
+        int part = affMgr.partition(key);
+
         // If we can send updates in parallel - do it.
-        return fastMap ?
-            cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primary(key, topVer));
+        Collection<ClusterNode> nodes = fastMap ?
+            cctx.topology().nodes(part, topVer) :
+            Collections.singletonList(affMgr.primary(part, topVer));
+
+        return new T2<>(part, nodes);
     }
 
     /**
      * Maps future to single node.
      *
-     * @param nodeId Node ID.
+     * @param mappingKey Mapping key.
      * @param req Request.
      */
-    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
-        singleNodeId = nodeId;
+    private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) {
+        singleNodeId = mappingKey.nodeId();
         singleReq = req;
 
-        if (cctx.localNodeId().equals(nodeId)) {
-            cache.updateAllAsyncInternal(nodeId, req,
+        if (cctx.localNodeId().equals(mappingKey.nodeId())) {
+            cache.updateAllAsyncInternal(mappingKey.nodeId(), req,
                 new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
                     @Override public void apply(GridNearAtomicUpdateRequest req,
                         GridNearAtomicUpdateResponse res) {
@@ -1026,7 +1063,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (log.isDebugEnabled())
                     log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                sendRequest(mappingKey, req);
 
                 if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
                     onDone(new GridCacheReturn(cctx, true, null, true));
@@ -1042,34 +1079,37 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      *
      * @param mappings Mappings to send.
      */
-    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+    private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
-        GridNearAtomicUpdateRequest locUpdate = null;
+        Collection<GridNearAtomicUpdateRequest> locUpdates = null;
 
         // Send messages to remote nodes first, then run local update.
-        for (GridNearAtomicUpdateRequest req : mappings.values()) {
+        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+            GridAtomicMappingKey mappingKey = e.getKey();
+            GridNearAtomicUpdateRequest req = e.getValue();
+
             if (locNodeId.equals(req.nodeId())) {
-                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
-                    ", req=" + req + ']';
+                if (locUpdates == null)
+                    locUpdates = new ArrayList<>(mappings.size());
 
-                locUpdate = req;
+                locUpdates.add(req);
             }
             else {
                 try {
                     if (log.isDebugEnabled())
                         log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                    sendRequest(mappingKey, req);
                 }
-                catch (IgniteCheckedException e) {
-                    addFailedKeys(req.keys(), e);
+                catch (IgniteCheckedException ex) {
+                    addFailedKeys(req.keys(), ex);
 
-                    removeMapping(req.nodeId());
+                    removeMapping(mappingKey);
                 }
 
                 if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
-                    removeMapping(req.nodeId());
+                    removeMapping(mappingKey);
             }
         }
 
@@ -1077,28 +1117,52 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             // In FULL_ASYNC mode always return (null, true).
             opRes = new GridCacheReturn(cctx, true, null, true);
 
-        if (locUpdate != null) {
-            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest req,
-                        GridNearAtomicUpdateResponse res) {
-                        assert res.futureVersion().equals(futVer) : futVer;
+        if (locUpdates != null) {
+            for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
+                cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+                    new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                        @Override public void apply(GridNearAtomicUpdateRequest req,
+                            GridNearAtomicUpdateResponse res) {
+                            assert res.futureVersion().equals(futVer) : futVer;
 
-                        onResult(res.nodeId(), res);
-                    }
-                });
+                            onResult(res.nodeId(), res);
+                        }
+                    });
+            }
         }
 
         checkComplete();
     }
 
     /**
+     * Sends request.
+     *
+     * @param mappingKey Mapping key.
+     * @param req Update request.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req)
+        throws IgniteCheckedException {
+        if (mappingKey.partition() >= 0) {
+            Object topic = new GridAtomicRequestTopic(cctx.cacheId(), mappingKey.partition(), true);
+
+            cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
+                2 * cctx.gridConfig().getNetworkTimeout());
+        }
+        else {
+            assert mappingKey.partition() == -1;
+
+            cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+        }
+    }
+
+    /**
      * Removes mapping from future mappings map.
      *
-     * @param nodeId Node ID to remove mapping for.
+     * @param mappingKey Mapping key.
      */
-    private void removeMapping(UUID nodeId) {
-        mappings.remove(nodeId);
+    private void removeMapping(GridAtomicMappingKey mappingKey) {
+        mappings.remove(mappingKey);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 86c5ab8..b3075c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -135,6 +135,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     /** */
     private boolean clientReq;
 
+    /** Partition. */
+    private int part;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param clientReq Client node request flag.
+     * @param part Partition.
      */
     public GridNearAtomicUpdateRequest(
         int cacheId,
@@ -180,7 +184,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         @Nullable UUID subjId,
         int taskNameHash,
         boolean skipStore,
-        boolean clientReq
+        boolean clientReq,
+        int part
     ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
@@ -200,6 +205,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
         this.clientReq = clientReq;
+        this.part = part;
 
         keys = new ArrayList<>();
     }
@@ -315,6 +321,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     }
 
     /**
+     * @return Partition.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
      * @param key Key to add.
      * @param val Optional update value.
      * @param conflictTtl Conflict TTL (optional).
@@ -666,54 +679,60 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeBoolean("retval", retval))
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeBoolean("skipStore", skipStore))
+                if (!writer.writeBoolean("retval", retval))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("skipStore", skipStore))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeBoolean("topLocked", topLocked))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeBoolean("topLocked", topLocked))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeMessage("updateVer", updateVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 24:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 25:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -844,7 +863,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 16:
-                retval = reader.readBoolean("retval");
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
@@ -852,7 +871,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 17:
-                skipStore = reader.readBoolean("skipStore");
+                retval = reader.readBoolean("retval");
 
                 if (!reader.isLastRead())
                     return false;
@@ -860,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 18:
-                subjId = reader.readUuid("subjId");
+                skipStore = reader.readBoolean("skipStore");
 
                 if (!reader.isLastRead())
                     return false;
@@ -868,6 +887,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 19:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -879,7 +906,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -887,7 +914,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 topLocked = reader.readBoolean("topLocked");
 
                 if (!reader.isLastRead())
@@ -895,7 +922,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -903,7 +930,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 updateVer = reader.readMessage("updateVer");
 
                 if (!reader.isLastRead())
@@ -911,7 +938,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 24:
+            case 25:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -931,7 +958,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 25;
+        return 26;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 8e1bee2..e2d33d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -92,6 +92,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Near expire times. */
     private GridLongList nearExpireTimes;
 
+    /** Partition. */
+    private int part;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -103,11 +106,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param cacheId Cache ID.
      * @param nodeId Node ID this reply should be sent to.
      * @param futVer Future version.
+     * @param part Partition.
      */
-    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futVer = futVer;
+        this.part = part;
     }
 
     /** {@inheritDoc} */
@@ -138,7 +143,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /**
      * Sets update error.
-     * @param err Exception.
+     * @param err
      */
     public void error(IgniteCheckedException err){
         this.err = err;
@@ -188,6 +193,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
+     * @return Partition.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
      * Adds value to be put in near cache on originating node.
      *
      * @param keyIdx Key index.
@@ -485,12 +497,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
             case 13:
+                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
                 if (!writer.writeMessage("ret", ret))
                     return false;
 
@@ -585,7 +603,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 12:
-                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
@@ -593,6 +611,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 13:
+                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
                 ret = reader.readMessage("ret");
 
                 if (!reader.isLastRead())
@@ -612,7 +638,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 14;
+        return 15;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index a6e6c4d..37824eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -599,7 +599,7 @@ public class GridDhtPartitionDemandPool {
             if (isCancelled() || topologyChanged())
                 return missed;
 
-            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+            cctx.io().addPerTopicHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
                 @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
                     addMessage(new SupplyMessage(nodeId, msg));
                 }
@@ -641,7 +641,7 @@ public class GridDhtPartitionDemandPool {
                                 growTimeout(timeout);
 
                                 // Ordered listener was removed if timeout expired.
-                                cctx.io().removeOrderedHandler(d.topic());
+                                cctx.io().removePerTopicHandler(d.topic());
 
                                 // Must create copy to be able to work with IO manager thread local caches.
                                 d = new GridDhtPartitionDemandMessage(d, remaining);
@@ -650,13 +650,12 @@ public class GridDhtPartitionDemandPool {
                                 d.topic(topic(++cntr));
 
                                 // Create new ordered listener.
-                                cctx.io().addOrderedHandler(d.topic(),
-                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                                        @Override public void apply(UUID nodeId,
-                                            GridDhtPartitionSupplyMessage msg) {
+                                cctx.io().addPerTopicHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                                        @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
                                             addMessage(new SupplyMessage(nodeId, msg));
                                         }
-                                    });
+                                    }
+                                );
 
                                 // Resend message with larger timeout.
                                 retry = true;
@@ -800,7 +799,7 @@ public class GridDhtPartitionDemandPool {
                 return missed;
             }
             finally {
-                cctx.io().removeOrderedHandler(d.topic());
+                cctx.io().removePerTopicHandler(d.topic());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 316713f..a530a5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -556,11 +556,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
             final Object topic = topic(cctx.nodeId(), req.id());
 
-            cctx.io().addOrderedHandler(topic, resHnd);
+            cctx.io().addPerTopicHandler(topic, resHnd);
 
             fut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(topic);
+                    cctx.io().removePerTopicHandler(topic);
                 }
             });
 
@@ -665,11 +665,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
             final Object topic = topic(cctx.nodeId(), req.id());
 
-            cctx.io().addOrderedHandler(topic, resHnd);
+            cctx.io().addPerTopicHandler(topic, resHnd);
 
             fut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(topic);
+                    cctx.io().removePerTopicHandler(topic);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index df4873a..3ec1d07 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -529,6 +529,7 @@ org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFu
 org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$1
 org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$2
 org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$2$1
+org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridAtomicRequestTopic
 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache
 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$10
 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$11
@@ -806,7 +807,6 @@ org.apache.ignite.internal.processors.closure.GridClosureProcessor$T5
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T6
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T7
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T8
-org.apache.ignite.internal.processors.closure.GridClosureProcessor$T9
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$TaskNoReduceAdapter
 org.apache.ignite.internal.processors.closure.GridPeerDeployAwareTaskAdapter
 org.apache.ignite.internal.processors.continuous.GridContinuousHandler


Mime
View raw message