ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [12/22] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches
Date Tue, 06 Jun 2017 15:06:01 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index bb31645..a251047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -110,16 +110,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     /** Number of retries using to send messages. */
     private int retryCnt;
 
-    /** Indexed class handlers. */
-    private volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
+    /** */
+    private final MessageHandlers cacheHandlers = new MessageHandlers();
 
-    /** Handler registry. */
-    private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
-        clsHandlers = new ConcurrentHashMap8<>();
-
-    /** Ordered handler registry. */
-    private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers =
-        new ConcurrentHashMap8<>();
+    /** */
+    private final MessageHandlers grpHandlers = new MessageHandlers();
 
     /** Stopping flag. */
     private boolean stopping;
@@ -144,17 +139,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             if (cacheMsg.partitionExchangeMessage()) {
                 if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
+                    GridDhtAffinityAssignmentRequest msg0 = (GridDhtAffinityAssignmentRequest)cacheMsg;
+
                     assert cacheMsg.topologyVersion() != null : cacheMsg;
 
                     AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
 
-                    DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
+                    CacheGroupDescriptor desc = cctx.cache().cacheGroupDescriptors().get(msg0.groupId());
 
-                    if (cacheDesc != null) {
-                        if (cacheDesc.startTopologyVersion() != null)
-                            startTopVer = cacheDesc.startTopologyVersion();
-                        else if (cacheDesc.receivedFromStartVersion() != null)
-                            startTopVer = cacheDesc.receivedFromStartVersion();
+                    if (desc != null) {
+                        if (desc.startTopologyVersion() != null)
+                            startTopVer = desc.startTopologyVersion();
+                        else if (desc.receivedFromStartVersion() != null)
+                            startTopVer = desc.receivedFromStartVersion();
                     }
 
                     // Need to wait for exchange to avoid race between cache start and affinity request.
@@ -165,7 +162,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                             log.debug("Wait for exchange before processing message [msg=" + msg +
                                 ", node=" + nodeId +
                                 ", waitVer=" + startTopVer +
-                                ", cacheDesc=" + cacheDesc + ']');
+                                ", cacheDesc=" + descriptorForMessage(cacheMsg) + ']');
                         }
 
                         fut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -260,21 +257,31 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     @SuppressWarnings("unchecked")
     private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) {
+        handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers);
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param cacheMsg Message.
+     * @param msgHandlers Message handlers.
+     */
+    @SuppressWarnings("unchecked")
+    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) {
         int msgIdx = cacheMsg.lookupIndex();
 
         IgniteBiInClosure<UUID, GridCacheMessage> c = null;
 
         if (msgIdx >= 0) {
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
 
-            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.cacheId());
+            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId());
 
             if (cacheClsHandlers != null)
                 c = cacheClsHandlers[msgIdx];
         }
 
         if (c == null)
-            c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass()));
+            c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
 
         if (c == null) {
             IgniteLogger log = cacheMsg.messageLogger(cctx);
@@ -285,12 +292,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()).
                 append(", msgTopVer=").append(cacheMsg.topologyVersion()).
-                append(", cacheDesc=").append(cctx.cache().cacheDescriptor(cacheMsg.cacheId())).
+                append(", desc=").append(descriptorForMessage(cacheMsg)).
                 append(']');
 
             msg0.append(U.nl()).append("Registered listeners:");
 
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
 
             for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet())
                 msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
@@ -323,7 +330,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridIO().removeMessageListener(TOPIC_CACHE);
 
-        for (Object ordTopic : orderedHandlers.keySet())
+        for (Object ordTopic : cacheHandlers.orderedHandlers.keySet())
+            cctx.gridIO().removeMessageListener(ordTopic);
+
+        for (Object ordTopic : grpHandlers.orderedHandlers.keySet())
             cctx.gridIO().removeMessageListener(ordTopic);
 
         boolean interrupted = false;
@@ -520,14 +530,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c)
         throws IgniteCheckedException {
-        GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
+        assert msg != null;
+
+        GridCacheContext ctx = msg instanceof GridCacheIdMessage ?
+            cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()) : null;
 
         switch (msg.directType()) {
             case 30: {
                 GridDhtLockRequest req = (GridDhtLockRequest)msg;
 
                 GridDhtLockResponse res = new GridDhtLockResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     req.version(),
                     req.futureId(),
                     req.miniId(),
@@ -560,17 +573,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
 
                 GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     req.partition(),
                     req.futureId(),
-                    ctx.deploymentEnabled());
+                    false);
 
                 res.onError(req.classError());
 
                 sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
 
                 if (req.nearNodeId() != null) {
-                    GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+                    GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
                         req.partition(),
                         req.nearFutureId(),
                         nodeId,
@@ -588,12 +601,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg;
 
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     nodeId,
                     req.futureId(),
                     req.partition(),
                     false,
-                    ctx.deploymentEnabled());
+                    false);
 
                 res.error(req.classError());
 
@@ -606,10 +619,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
 
                 GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     req.futureId(),
                     req.miniId(),
-                    ctx.deploymentEnabled()
+                    false
                 );
 
                 res.error(req.classError());
@@ -629,7 +642,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearGetRequest req = (GridNearGetRequest)msg;
 
                 GridNearGetResponse res = new GridNearGetResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     req.futureId(),
                     req.miniId(),
                     req.version(),
@@ -645,7 +658,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             case 50: {
                 GridNearGetResponse res = (GridNearGetResponse)msg;
 
-                CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
+                CacheGetFuture fut = (CacheGetFuture)cctx.mvcc().future(res.futureId());
 
                 if (fut == null) {
                     if (log.isDebugEnabled())
@@ -665,7 +678,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearLockRequest req = (GridNearLockRequest)msg;
 
                 GridNearLockResponse res = new GridNearLockResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     req.version(),
                     req.futureId(),
                     req.miniId(),
@@ -673,7 +686,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     0,
                     req.classError(),
                     null,
-                    ctx.deploymentEnabled());
+                    false);
 
                 sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
             }
@@ -712,7 +725,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     cctx.deploymentEnabled());
 
                 cctx.io().sendOrderedMessage(
-                    ctx.node(nodeId),
+                    cctx.node(nodeId),
                     TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
                     res,
                     ctx.ioPolicy(),
@@ -731,7 +744,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
 
                 GridNearSingleGetResponse res = new GridNearSingleGetResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     req.futureId(),
                     req.topologyVersion(),
                     null,
@@ -748,7 +761,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             case 117: {
                 GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
 
-                GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc()
+                GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)cctx.mvcc()
                     .future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()));
 
                 if (fut == null) {
@@ -769,12 +782,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
 
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     nodeId,
                     req.futureId(),
                     req.partition(),
                     false,
-                    ctx.deploymentEnabled());
+                    false);
 
                 res.error(req.classError());
 
@@ -787,12 +800,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg;
 
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     nodeId,
                     req.futureId(),
                     req.partition(),
                     false,
-                    ctx.deploymentEnabled());
+                    false);
 
                 res.error(req.classError());
 
@@ -805,12 +818,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg;
 
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     nodeId,
                     req.futureId(),
                     req.partition(),
                     false,
-                    ctx.deploymentEnabled());
+                    false);
 
                 res.error(req.classError());
 
@@ -823,17 +836,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg;
 
                 GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
-                    ctx.cacheId(),
+                    req.cacheId(),
                     req.partition(),
                     req.futureId(),
-                    ctx.deploymentEnabled());
+                    false);
 
                 res.onError(req.classError());
 
                 sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
 
                 if (req.nearNodeId() != null) {
-                    GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+                    GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
                         req.partition(),
                         req.nearFutureId(),
                         nodeId,
@@ -887,8 +900,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 if (txState != null)
                     txState.unwindEvicts(cctx);
             }
-            else {
-                GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
+            else if (msg instanceof GridCacheIdMessage) {
+                GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId());
 
                 if (ctx != null)
                     CU.unwindEvicts(ctx);
@@ -1180,79 +1193,125 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Adds message handler.
-     *
-     * @param cacheId Cache ID.
+     * @param hndId Message handler ID.
      * @param type Type of message.
      * @param c Handler.
      */
-    @SuppressWarnings({"unchecked"})
-    public void addHandler(
-        int cacheId,
+    public void addCacheHandler(
+        int hndId,
         Class<? extends GridCacheMessage> type,
         IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
+
+        addHandler(hndId, type, c, cacheHandlers);
+    }
+
+    /**
+     * @param hndId Message handler ID.
+     * @param type Type of message.
+     * @param c Handler.
+     */
+    public void addCacheGroupHandler(
+        int hndId,
+        Class<? extends GridCacheGroupIdMessage> type,
+        IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        assert !type.isAssignableFrom(GridCacheIdMessage.class) : type;
+
+        addHandler(hndId, type, c, grpHandlers);
+    }
+
+    /**
+     * @param hndId Message handler ID.
+     * @param type Type of message.
+     * @param c Handler.
+     * @param msgHandlers Message handlers.
+     */
+    @SuppressWarnings({"unchecked"})
+    private void addHandler(
+        int hndId,
+        Class<? extends GridCacheMessage> type,
+        IgniteBiInClosure<UUID, ? extends GridCacheMessage> c,
+        MessageHandlers msgHandlers) {
         int msgIdx = messageIndex(type);
 
         if (msgIdx != -1) {
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
 
-            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheId);
+            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(hndId);
 
             if (cacheClsHandlers == null) {
                 cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
 
-                idxClsHandlers0.put(cacheId, cacheClsHandlers);
+                idxClsHandlers0.put(hndId, cacheClsHandlers);
             }
 
             if (cacheClsHandlers[msgIdx] != null)
-                throw new IgniteException("Duplicate cache message ID found [cacheId=" + cacheId +
+                throw new IgniteException("Duplicate cache message ID found [hndId=" + hndId +
                     ", type=" + type + ']');
 
             cacheClsHandlers[msgIdx] = c;
 
-            idxClsHandlers = idxClsHandlers0;
+            msgHandlers.idxClsHandlers = idxClsHandlers0;
 
             return;
         }
         else {
-            ListenerKey key = new ListenerKey(cacheId, type);
+            ListenerKey key = new ListenerKey(hndId, type);
 
-            if (clsHandlers.putIfAbsent(key,
+            if (msgHandlers.clsHandlers.putIfAbsent(key,
                 (IgniteBiInClosure<UUID, GridCacheMessage>)c) != null)
-                assert false : "Handler for class already registered [cacheId=" + cacheId + ", cls=" + type +
-                    ", old=" + clsHandlers.get(key) + ", new=" + c + ']';
+                assert false : "Handler for class already registered [hndId=" + hndId + ", cls=" + type +
+                    ", old=" + msgHandlers.clsHandlers.get(key) + ", new=" + c + ']';
         }
 
         IgniteLogger log0 = log;
 
         if (log0 != null && log0.isTraceEnabled())
             log0.trace(
-                "Registered cache communication handler [cacheId=" + cacheId + ", type=" + type +
+                "Registered cache communication handler [hndId=" + hndId + ", type=" + type +
                     ", msgIdx=" + msgIdx + ", handler=" + c + ']');
     }
 
     /**
      * @param cacheId Cache ID to remove handlers for.
      */
-    public void removeHandlers(int cacheId) {
-        assert cacheId != 0;
+    void removeCacheHandlers(int cacheId) {
+        removeHandlers(cacheHandlers, cacheId);
+    }
+
+    /**
+     * @param grpId Cache group ID to remove handlers for.
+     */
+    void removeCacheGroupHandlers(int grpId) {
+        removeHandlers(grpHandlers, grpId);
+    }
+
+    /**
+     * @param msgHandlers Handlers.
+     * @param hndId ID to remove handlers for.
+     */
+    private void removeHandlers(MessageHandlers msgHandlers, int hndId) {
+        assert hndId != 0;
 
-        idxClsHandlers.remove(cacheId);
+        msgHandlers.idxClsHandlers.remove(hndId);
 
-        for (Iterator<ListenerKey> iter = clsHandlers.keySet().iterator(); iter.hasNext(); ) {
+        for (Iterator<ListenerKey> iter = msgHandlers.clsHandlers.keySet().iterator(); iter.hasNext(); ) {
             ListenerKey key = iter.next();
 
-            if (key.cacheId == cacheId)
+            if (key.hndId == hndId)
                 iter.remove();
         }
     }
 
     /**
-     * @param cacheId Cache ID to remove handlers for.
+     * @param cacheGrp {@code True} if cache group handler, {@code false} if cache handler.
+     * @param hndId Handler ID.
      * @param type Message type.
      */
-    public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) {
-        clsHandlers.remove(new ListenerKey(cacheId, type));
+    public void removeHandler(boolean cacheGrp, int hndId, Class<? extends GridCacheMessage> type) {
+        MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
+
+        msgHandlers.clsHandlers.remove(new ListenerKey(hndId, type));
     }
 
     /**
@@ -1274,16 +1333,35 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param topic Topic.
+     * @param c Handler.
+     */
+    public void addOrderedCacheHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) {
+        addOrderedHandler(false, topic, c);
+    }
+
+    /**
+     * @param topic Topic.
+     * @param c Handler.
+     */
+    public void addOrderedCacheGroupHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) {
+        addOrderedHandler(true, topic, c);
+    }
+
+    /**
      * Adds ordered message handler.
      *
+     * @param cacheGrp {@code True} if cache group message, {@code false} if cache message.
      * @param topic Topic.
      * @param c Handler.
      */
     @SuppressWarnings({"unchecked"})
-    public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+    private void addOrderedHandler(boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
+
         IgniteLogger log0 = log;
 
-        if (orderedHandlers.putIfAbsent(topic, c) == null) {
+        if (msgHandlers.orderedHandlers.putIfAbsent(topic, c) == null) {
             cctx.gridIO().addMessageListener(topic, new OrderedMessageListener(
                 (IgniteBiInClosure<UUID, GridCacheMessage>)c));
 
@@ -1298,10 +1376,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     /**
      * Removed ordered message handler.
      *
+     * @param cacheGrp {@code True} if cache group message, {@code false} if cache message.
      * @param topic Topic.
      */
-    public void removeOrderedHandler(Object topic) {
-        if (orderedHandlers.remove(topic) != null) {
+    public void removeOrderedHandler(boolean cacheGrp, Object topic) {
+        MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
+
+        if (msgHandlers.orderedHandlers.remove(topic) != null) {
             cctx.gridIO().removeMessageListener(topic);
 
             if (log != null && log.isDebugEnabled())
@@ -1352,12 +1433,43 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         }
     }
 
+    /**
+     * @param msg Message.
+     * @return Cache or group descriptor.
+     */
+    private Object descriptorForMessage(GridCacheMessage msg) {
+        if (msg instanceof GridCacheIdMessage)
+            return cctx.cache().cacheDescriptor(((GridCacheIdMessage)msg).cacheId());
+        else if (msg instanceof GridCacheGroupIdMessage)
+            return cctx.cache().cacheGroupDescriptors().get(((GridCacheGroupIdMessage)msg).groupId());
+
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>> ");
         X.println(">>> Cache IO manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']');
-        X.println(">>>   clsHandlersSize: " + clsHandlers.size());
-        X.println(">>>   orderedHandlersSize: " + orderedHandlers.size());
+        X.println(">>>   cacheClsHandlersSize: " + cacheHandlers.clsHandlers.size());
+        X.println(">>>   cacheOrderedHandlersSize: " + cacheHandlers.orderedHandlers.size());
+        X.println(">>>   cacheGrpClsHandlersSize: " + grpHandlers.clsHandlers.size());
+        X.println(">>>   cacheGrpOrderedHandlersSize: " + grpHandlers.orderedHandlers.size());
+    }
+
+    /**
+     *
+     */
+    static class MessageHandlers {
+        /** Indexed class handlers. */
+        volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
+
+        /** Handler registry. */
+        ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
+            clsHandlers = new ConcurrentHashMap8<>();
+
+        /** Ordered handler registry. */
+        ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers =
+            new ConcurrentHashMap8<>();
     }
 
     /**
@@ -1391,17 +1503,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     private static class ListenerKey {
         /** Cache ID. */
-        private int cacheId;
+        private int hndId;
 
         /** Message class. */
         private Class<? extends GridCacheMessage> msgCls;
 
         /**
-         * @param cacheId Cache ID.
+         * @param hndId Handler ID.
          * @param msgCls Message class.
          */
-        private ListenerKey(int cacheId, Class<? extends GridCacheMessage> msgCls) {
-            this.cacheId = cacheId;
+        private ListenerKey(int hndId, Class<? extends GridCacheMessage> msgCls) {
+            this.hndId = hndId;
             this.msgCls = msgCls;
         }
 
@@ -1415,12 +1527,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             ListenerKey that = (ListenerKey)o;
 
-            return cacheId == that.cacheId && msgCls.equals(that.msgCls);
+            return hndId == that.hndId && msgCls.equals(that.msgCls);
         }
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            int res = cacheId;
+            int res = hndId;
 
             res = 31 * res + msgCls.hashCode();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
index db99272..63cfe1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
@@ -18,37 +18,65 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
 
 /**
  * GridCacheConcurrentMap implementation for local and near caches.
  */
 public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl {
     /** */
-    private final AtomicInteger pubSize = new AtomicInteger();
+    private final int cacheId;
 
-    public GridCacheLocalConcurrentMap(GridCacheContext ctx,
-        GridCacheMapEntryFactory factory, int initialCapacity) {
-        super(ctx, factory, initialCapacity);
+    /** */
+    private final CacheMapHolder entryMap;
+
+    /**
+     * @param cctx Cache context.
+     * @param factory Entry factory.
+     * @param initCap Initial capacity.
+     */
+    public GridCacheLocalConcurrentMap(GridCacheContext cctx, GridCacheMapEntryFactory factory, int initCap) {
+        super(factory);
+
+        this.cacheId = cctx.cacheId();
+        this.entryMap = new CacheMapHolder(cctx,
+            new ConcurrentHashMap8<KeyCacheObject, GridCacheMapEntry>(initCap, 0.75f, Runtime.getRuntime().availableProcessors() * 2));
     }
 
-    public GridCacheLocalConcurrentMap(GridCacheContext ctx,
-        GridCacheMapEntryFactory factory, int initialCapacity, float loadFactor, int concurrencyLevel) {
-        super(ctx, factory, initialCapacity, loadFactor, concurrencyLevel);
+    /** {@inheritDoc} */
+    @Override public int internalSize() {
+        return entryMap.map.size();
     }
 
     /** {@inheritDoc} */
-    @Override public int publicSize() {
-        return pubSize.get();
+    @Nullable @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) {
+        return entryMap;
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
-        pubSize.incrementAndGet();
+    @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) {
+        return entryMap;
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
-        pubSize.decrementAndGet();
+    @Override public int publicSize(int cacheId) {
+        assert this.cacheId == cacheId;
+
+        return entryMap.size.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
+        assert cacheId == e.context().cacheId();
+
+        entryMap.size.incrementAndGet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
+        assert cacheId == e.context().cacheId();
+
+        entryMap.size.decrementAndGet();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 7c7fc99..01677a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -981,7 +981,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
                 deletedUnlocked(false);
 
-            updateCntr0 = nextPartCounter(topVer);
+            updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr);
 
             if (updateCntr != null && updateCntr != 0)
                 updateCntr0 = updateCntr;
@@ -1160,7 +1160,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
 
-            updateCntr0 = nextPartCounter(topVer);
+            updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr);
 
             if (updateCntr != null && updateCntr != 0)
                 updateCntr0 = updateCntr;
@@ -1562,7 +1562,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 updateMetrics(op, metrics);
 
             if (lsnrCol != null) {
-                long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE);
+                long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null);
 
                 cctx.continuousQueries().onEntryUpdated(
                     lsnrCol,
@@ -1651,6 +1651,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()));
 
             c = new AtomicCacheUpdateClosure(this,
+                topVer,
                 newVer,
                 op,
                 writeObj,
@@ -1677,7 +1678,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 c.call(dataRow);
             }
             else
-                cctx.offheap().invoke(key, localPartition(), c);
+                cctx.offheap().invoke(cctx, key, localPartition(), c);
 
             GridCacheUpdateAtomicResult updateRes = c.updateRes;
 
@@ -1722,7 +1723,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         else
                             evtVal = (CacheObject)writeObj;
 
-                        long updateCntr0 = nextPartCounter();
+                        long updateCntr0 = nextPartitionCounter(topVer, primary, updateCntr);
 
                         if (updateCntr != null)
                             updateCntr0 = updateCntr;
@@ -2612,7 +2613,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 long updateCntr = 0;
 
                 if (!preload)
-                    updateCntr = nextPartCounter(topVer);
+                    updateCntr = nextPartitionCounter(topVer, true, null);
 
                 if (walEnabled) {
                     cctx.shared().wal().log(new DataRecord(new DataEntry(
@@ -2668,33 +2669,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /**
+     * @param topVer Topology version for current operation.
+     * @param primary Primary node update flag.
+     * @param primaryCntr Counter assigned on primary node.
      * @return Update counter.
      */
-    protected long nextPartCounter() {
+    protected long nextPartitionCounter(AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) {
         return 0;
     }
 
-    /**
-     * @param topVer Topology version.
-     * @return Update counter.
-     */
-    private long nextPartCounter(AffinityTopologyVersion topVer) {
-        long updateCntr;
-
-        if (!cctx.isLocal() && !isNear()) {
-            GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false);
-
-            if (locPart == null)
-                return 0;
-
-            updateCntr = locPart.nextUpdateCounter();
-        }
-        else
-            updateCntr = 0;
-
-        return updateCntr;
-    }
-
     /** {@inheritDoc} */
     @Override public synchronized GridCacheVersionedEntryEx versionedEntry(final boolean keepBinary)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
@@ -3218,14 +3201,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param oldRow Old row if available.
      * @throws IgniteCheckedException If update failed.
      */
-    protected void storeValue(@Nullable CacheObject val,
+    protected void storeValue(CacheObject val,
         long expireTime,
         GridCacheVersion ver,
         @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
         assert Thread.holdsLock(this);
         assert val != null : "null values in update for key: " + key;
 
-        cctx.offheap().invoke(key,  localPartition(), new UpdateClosure(this, val, ver, expireTime));
+        cctx.offheap().invoke(cctx, key,  localPartition(), new UpdateClosure(this, val, ver, expireTime));
     }
 
     /**
@@ -3267,7 +3250,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     protected void removeValue() throws IgniteCheckedException {
         assert Thread.holdsLock(this);
 
-        cctx.offheap().remove(key, partition(), localPartition());
+        cctx.offheap().remove(cctx, key, partition(), localPartition());
     }
 
     /** {@inheritDoc} */
@@ -3597,7 +3580,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         GridDhtLocalPartition locPart = localPartition();
 
         if (locPart != null)
-            locPart.incrementPublicSize(this);
+            locPart.incrementPublicSize(null, this);
         else
             cctx.incrementPublicSize(this);
     }
@@ -3609,7 +3592,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         GridDhtLocalPartition locPart = localPartition();
 
         if (locPart != null)
-            locPart.decrementPublicSize(this);
+            locPart.decrementPublicSize(null, this);
         else
             cctx.decrementPublicSize(this);
     }
@@ -3921,7 +3904,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (oldRow != null)
                 oldRow.key(entry.key);
 
-            newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(entry.key,
+            newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
+                entry.cctx,
+                entry.key,
                 val,
                 ver,
                 expireTime,
@@ -3955,6 +3940,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         private final GridCacheMapEntry entry;
 
         /** */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
         private GridCacheVersion newVer;
 
         /** */
@@ -4017,7 +4005,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         /** */
         private CacheDataRow oldRow;
 
-        AtomicCacheUpdateClosure(GridCacheMapEntry entry,
+        AtomicCacheUpdateClosure(
+            GridCacheMapEntry entry,
+            AffinityTopologyVersion topVer,
             GridCacheVersion newVer,
             GridCacheOperation op,
             Object writeObj,
@@ -4038,6 +4028,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             assert op == UPDATE || op == DELETE || op == TRANSFORM : op;
 
             this.entry = entry;
+            this.topVer = topVer;
             this.newVer = newVer;
             this.op = op;
             this.writeObj = writeObj;
@@ -4273,7 +4264,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             if (needUpdate) {
-                newRow = entry.localPartition().dataStore().createRow(entry.key,
+                newRow = entry.localPartition().dataStore().createRow(
+                    entry.cctx,
+                    entry.key,
                     storeLoadedVal,
                     newVer,
                     entry.expireTimeExtras(),
@@ -4404,7 +4397,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     ", locNodeId=" + cctx.localNodeId() + ']';
             }
 
-            long updateCntr0 = entry.nextPartCounter();
+            long updateCntr0 = entry.nextPartitionCounter(topVer, primary, updateCntr);
 
             if (updateCntr != null)
                 updateCntr0 = updateCntr;
@@ -4412,7 +4405,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
 
             if (!entry.isNear()) {
-                newRow = entry.localPartition().dataStore().createRow(entry.key,
+                newRow = entry.localPartition().dataStore().createRow(
+                    entry.cctx,
+                    entry.key,
                     updated,
                     newVer,
                     newExpireTime,
@@ -4486,7 +4481,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 // Must persist inside synchronization in non-tx mode.
                 cctx.store().remove(null, entry.key);
 
-            long updateCntr0 = entry.nextPartCounter();
+            long updateCntr0 = entry.nextPartitionCounter(topVer, primary, updateCntr);
 
             if (updateCntr != null)
                 updateCntr0 = updateCntr;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 4de465c..11916e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -84,9 +84,15 @@ public abstract class GridCacheMessage implements Message {
     @GridDirectTransient
     private boolean skipPrepare;
 
-    /** Cache ID. */
-    @GridToStringInclude
-    protected int cacheId;
+    /**
+     * @return ID to distinguish message handlers for the same messages but for different caches/cache groups.
+     */
+    public abstract int handlerId();
+
+    /**
+     * @return {@code True} if cache group message.
+     */
+    public abstract boolean cacheGroupMessage();
 
     /**
      * @return Error, if any.
@@ -170,20 +176,6 @@ public abstract class GridCacheMessage implements Message {
     }
 
     /**
-     * @return Cache ID.
-     */
-    public int cacheId() {
-        return cacheId;
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     */
-    public void cacheId(int cacheId) {
-        this.cacheId = cacheId;
-    }
-
-    /**
      * Gets topology version or -1 in case of topology version is not required for this message.
      *
      * @return Topology version.
@@ -205,6 +197,15 @@ public abstract class GridCacheMessage implements Message {
      * @throws IgniteCheckedException If failed.
      */
     protected final void prepareObject(@Nullable Object o, GridCacheContext ctx) throws IgniteCheckedException {
+        prepareObject(o, ctx.shared());
+    }
+
+    /**
+     * @param o Object to prepare for marshalling.
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected final void prepareObject(@Nullable Object o, GridCacheSharedContext ctx) throws IgniteCheckedException {
         assert addDepInfo || forceAddDepInfo;
 
         if (!skipPrepare && o != null) {
@@ -279,24 +280,28 @@ public abstract class GridCacheMessage implements Message {
     /**
      * @param info Entry to marshal.
      * @param ctx Context.
+     * @param cacheObjCtx Cache object context.
      * @throws IgniteCheckedException If failed.
      */
-    protected final void marshalInfo(GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+    protected final void marshalInfo(GridCacheEntryInfo info,
+        GridCacheSharedContext ctx,
+        CacheObjectContext cacheObjCtx
+    ) throws IgniteCheckedException {
         assert ctx != null;
 
         if (info != null) {
-            info.marshal(ctx);
+            info.marshal(cacheObjCtx);
 
             if (addDepInfo) {
                 if (info.key() != null)
-                    prepareObject(info.key().value(ctx.cacheObjectContext(), false), ctx);
+                    prepareObject(info.key().value(cacheObjCtx, false), ctx);
 
                 CacheObject val = info.value();
 
                 if (val != null) {
-                    val.finishUnmarshal(ctx.cacheObjectContext(), ctx.deploy().globalLoader());
+                    val.finishUnmarshal(cacheObjCtx, ctx.deploy().globalLoader());
 
-                    prepareObject(CU.value(val, ctx, false), ctx);
+                    prepareObject(val.value(cacheObjCtx, false), ctx);
                 }
             }
         }
@@ -314,7 +319,7 @@ public abstract class GridCacheMessage implements Message {
         assert ctx != null;
 
         if (info != null)
-            info.unmarshal(ctx, ldr);
+            info.unmarshal(ctx.cacheObjectContext(), ldr);
     }
 
     /**
@@ -324,13 +329,14 @@ public abstract class GridCacheMessage implements Message {
      */
     protected final void marshalInfos(
         Iterable<? extends GridCacheEntryInfo> infos,
-        GridCacheContext ctx
+        GridCacheSharedContext ctx,
+        CacheObjectContext cacheObjCtx
     ) throws IgniteCheckedException {
         assert ctx != null;
 
         if (infos != null)
             for (GridCacheEntryInfo e : infos)
-                marshalInfo(e, ctx);
+                marshalInfo(e, ctx, cacheObjCtx);
     }
 
     /**
@@ -369,14 +375,14 @@ public abstract class GridCacheMessage implements Message {
 
                 if (addDepInfo) {
                     if (e.key() != null)
-                        prepareObject(e.key().value(cctx.cacheObjectContext(), false), cctx);
+                        prepareObject(e.key().value(cctx.cacheObjectContext(), false), ctx);
 
                     if (e.value() != null)
-                        prepareObject(e.value().value(cctx.cacheObjectContext(), false), cctx);
+                        prepareObject(e.value().value(cctx.cacheObjectContext(), false), ctx);
 
                     if (e.entryProcessors() != null) {
                         for (T2<EntryProcessor<Object, Object, Object>, Object[]> entProc : e.entryProcessors())
-                            prepareObject(entProc.get1(), cctx);
+                            prepareObject(entProc.get1(), ctx);
                     }
                 }
                 else if (p2pEnabled && e.entryProcessors() != null) {
@@ -384,7 +390,7 @@ public abstract class GridCacheMessage implements Message {
                         forceAddDepInfo = true;
 
                     for (T2<EntryProcessor<Object, Object, Object>, Object[]> entProc : e.entryProcessors())
-                        prepareObject(entProc.get1(), cctx);
+                        prepareObject(entProc.get1(), ctx);
                 }
             }
         }
@@ -435,7 +441,7 @@ public abstract class GridCacheMessage implements Message {
             Object arg = args[i];
 
             if (addDepInfo)
-                prepareObject(arg, ctx);
+                prepareObject(arg, ctx.shared());
 
             argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg);
         }
@@ -487,7 +493,7 @@ public abstract class GridCacheMessage implements Message {
 
         for (Object o : col) {
             if (addDepInfo)
-                prepareObject(o, ctx);
+                prepareObject(o, ctx.shared());
 
             byteCol.add(o == null ? null : CU.marshal(ctx, o));
         }
@@ -522,7 +528,7 @@ public abstract class GridCacheMessage implements Message {
             obj.prepareMarshal(ctx.cacheObjectContext());
 
             if (addDepInfo)
-                prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
+                prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx.shared());
         }
     }
 
@@ -541,7 +547,7 @@ public abstract class GridCacheMessage implements Message {
                 obj.prepareMarshal(ctx.cacheObjectContext());
 
                 if (addDepInfo)
-                    prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
+                    prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx.shared());
             }
         }
     }
@@ -630,6 +636,11 @@ public abstract class GridCacheMessage implements Message {
     }
 
     /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
@@ -642,18 +653,12 @@ public abstract class GridCacheMessage implements Message {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeInt("cacheId", cacheId))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
                 if (!writer.writeMessage("depInfo", depInfo))
                     return false;
 
                 writer.incrementState();
 
-            case 2:
+            case 1:
                 if (!writer.writeLong("msgId", msgId))
                     return false;
 
@@ -673,14 +678,6 @@ public abstract class GridCacheMessage implements Message {
 
         switch (reader.state()) {
             case 0:
-                cacheId = reader.readInt("cacheId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
                 depInfo = reader.readMessage("depInfo");
 
                 if (!reader.isLastRead())
@@ -688,7 +685,7 @@ public abstract class GridCacheMessage implements Message {
 
                 reader.incrementState();
 
-            case 2:
+            case 1:
                 msgId = reader.readLong("msgId");
 
                 if (!reader.isLastRead())
@@ -714,6 +711,6 @@ public abstract class GridCacheMessage implements Message {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridCacheMessage.class, this, "cacheId", cacheId);
+        return S.toString(GridCacheMessage.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2eec8f6..2d1aa05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -94,7 +94,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
@@ -308,21 +307,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
 
-        cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class,
+        cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
             new MessageHandler<GridDhtPartitionsSingleMessage>() {
                 @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
                     processSinglePartitionUpdate(node, msg);
                 }
             });
 
-        cctx.io().addHandler(0, GridDhtPartitionsFullMessage.class,
+        cctx.io().addCacheHandler(0, GridDhtPartitionsFullMessage.class,
             new MessageHandler<GridDhtPartitionsFullMessage>() {
                 @Override public void onMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
                     processFullPartitionUpdate(node, msg);
                 }
             });
 
-        cctx.io().addHandler(0, GridDhtPartitionsSingleRequest.class,
+        cctx.io().addCacheHandler(0, GridDhtPartitionsSingleRequest.class,
             new MessageHandler<GridDhtPartitionsSingleRequest>() {
                 @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
                     processSinglePartitionRequest(node, msg);
@@ -381,24 +380,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
                 final int idx = cnt;
 
-                cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() {
-                    @Override public void apply(final UUID id, final GridCacheMessage m) {
+                cctx.io().addOrderedCacheGroupHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() {
+                    @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) {
                         if (!enterBusy())
                             return;
 
                         try {
-                            GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId);
-
-                            if (cacheCtx != null) {
-                                if (m instanceof GridDhtPartitionSupplyMessage)
-                                    cacheCtx.preloader().handleSupplyMessage(
-                                        idx, id, (GridDhtPartitionSupplyMessage)m);
-                                else if (m instanceof GridDhtPartitionDemandMessage)
-                                    cacheCtx.preloader().handleDemandMessage(
-                                        idx, id, (GridDhtPartitionDemandMessage)m);
-                                else
-                                    U.error(log, "Unsupported message type: " + m.getClass().getName());
+                            CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId());
+
+                            if (grp != null) {
+                                if (m instanceof GridDhtPartitionSupplyMessage) {
+                                    grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m);
+
+                                    return;
+                                }
+                                else if (m instanceof GridDhtPartitionDemandMessage) {
+                                    grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m);
+
+                                    return;
+                                }
                             }
+
+                            U.error(log, "Unsupported message type: " + m.getClass().getName());
                         }
                         finally {
                             leaveBusy();
@@ -416,14 +419,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     try {
                         fut.get();
 
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                            cacheCtx.preloader().onInitialExchangeComplete(null);
+                        for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                            grp.preloader().onInitialExchangeComplete(null);
 
                         reconnectExchangeFut.onDone();
                     }
                     catch (IgniteCheckedException e) {
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                            cacheCtx.preloader().onInitialExchangeComplete(e);
+                        for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                            grp.preloader().onInitialExchangeComplete(e);
 
                         reconnectExchangeFut.onDone(e);
                     }
@@ -468,9 +471,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0);
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (nodeStartVer.equals(cacheCtx.startTopologyVersion()))
-                    cacheCtx.preloader().onInitialExchangeComplete(null);
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (nodeStartVer.equals(grp.localStartVersion()))
+                    grp.preloader().onInitialExchangeComplete(null);
             }
 
             if (log.isDebugEnabled())
@@ -490,9 +493,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
 
-        cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
-        cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
-        cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
+        cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleMessage.class);
+        cctx.io().removeHandler(false, 0, GridDhtPartitionsFullMessage.class);
+        cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleRequest.class);
 
         stopErr = cctx.kernalContext().clientDisconnected() ?
             new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
@@ -512,7 +515,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++)
-                cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
+                cctx.io().removeOrderedHandler(true, rebalanceTopic(cnt));
         }
 
         U.cancel(exchWorker);
@@ -547,22 +550,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param exchFut Exchange future.
      * @return Topology.
      */
-    public GridDhtPartitionTopology clientTopology(int cacheId, GridDhtPartitionsExchangeFuture exchFut) {
-        GridClientPartitionTopology top = clientTops.get(cacheId);
+    public GridDhtPartitionTopology clientTopology(int grpId, GridDhtPartitionsExchangeFuture exchFut) {
+        GridClientPartitionTopology top = clientTops.get(grpId);
 
         if (top != null)
             return top;
 
         Object affKey = null;
 
-        DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId);
+        CacheGroupDescriptor grpDesc = cctx.cache().cacheGroupDescriptors().get(grpId);
 
-        if (desc != null) {
-            CacheConfiguration ccfg = desc.cacheConfiguration();
+        if (grpDesc != null) {
+            CacheConfiguration<?, ?> ccfg = grpDesc.config();
 
             AffinityFunction aff = ccfg.getAffinity();
 
@@ -572,8 +575,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 aff.partitions());
         }
 
-        GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId,
-            top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey));
+        GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
+            top = new GridClientPartitionTopology(cctx, grpId, exchFut, affKey));
 
         return old != null ? old : top;
     }
@@ -586,11 +589,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Client partition topology.
      */
-    public GridClientPartitionTopology clearClientTopology(int cacheId) {
-        return clientTops.remove(cacheId);
+    public GridClientPartitionTopology clearClientTopology(int grpId) {
+        return clientTops.remove(grpId);
     }
 
     /**
@@ -804,18 +807,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         // If this is the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
             // Check rebalance state & send CacheAffinityChangeMessage if need.
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (!cacheCtx.isLocal()) {
-                    if (cacheCtx == null)
-                        continue;
-
-                    GridDhtPartitionTopology top = null;
-
-                    if (!cacheCtx.isLocal())
-                        top = cacheCtx.topology();
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (!grp.isLocal()) {
+                    GridDhtPartitionTopology top = grp.topology();
 
                     if (top != null)
-                        cctx.affinity().checkRebalanceState(top, cacheCtx.cacheId());
+                        cctx.affinity().checkRebalanceState(top, grp.groupId());
                 }
             }
 
@@ -825,7 +822,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             AffinityTopologyVersion rmtTopVer =
                 lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
 
-            Collection<ClusterNode> rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
+            Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer);
 
             if (log.isDebugEnabled())
                 log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
@@ -843,9 +840,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * @param nodes Nodes.
-     * @return {@code True} if message was sent, {@code false} if node left grid.
      */
-    private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
+    private void sendAllPartitions(Collection<ClusterNode> nodes) {
         GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true);
 
         if (log.isDebugEnabled())
@@ -866,8 +862,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']');
             }
         }
-
-        return true;
     }
 
     /**
@@ -889,51 +883,48 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
 
-        cctx.forAllCaches(new IgniteInClosure<GridCacheContext>() {
-            @Override public void apply(GridCacheContext cacheCtx) {
-                if (!cacheCtx.isLocal()) {
-                    boolean ready;
-
-                    if (exchId != null) {
-                        AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
-
-                        ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
-                    }
-                    else
-                        ready = cacheCtx.started();
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (!grp.isLocal()) {
+                if (exchId != null) {
+                    AffinityTopologyVersion startTopVer = grp.localStartVersion();
 
-                    if (ready) {
-                        GridAffinityAssignmentCache affCache = cacheCtx.affinity().affinityCache();
+                    if (startTopVer.compareTo(exchId.topologyVersion()) > 0)
+                        continue;
+                }
 
-                        GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
+                GridAffinityAssignmentCache affCache = grp.affinity();
 
-                        addFullPartitionsMap(m,
-                            dupData,
-                            compress,
-                            cacheCtx.cacheId(),
-                            locMap,
-                            affCache.similarAffinityKey());
+                GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
 
-                        if (exchId != null)
-                            m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
-                    }
+                if (locMap != null) {
+                    addFullPartitionsMap(m,
+                        dupData,
+                        compress,
+                        grp.groupId(),
+                        locMap,
+                        affCache.similarAffinityKey());
                 }
+
+                if (exchId != null)
+                    m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
             }
-        });
+        }
 
         // It is important that client topologies be added after contexts.
         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
             GridDhtPartitionFullMap map = top.partitionMap(true);
 
-            addFullPartitionsMap(m,
-                dupData,
-                compress,
-                top.cacheId(),
-                map,
-                top.similarAffinityKey());
+            if (map != null) {
+                addFullPartitionsMap(m,
+                    dupData,
+                    compress,
+                    top.groupId(),
+                    map,
+                    top.similarAffinityKey());
+            }
 
             if (exchId != null)
-                m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+                m.addPartitionUpdateCounters(top.groupId(), top.updateCounters(true));
         }
 
         return m;
@@ -943,19 +934,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param m Message.
      * @param dupData Duplicated data map.
      * @param compress {@code True} if need check for duplicated partition state data.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param map Map to add.
      * @param affKey Cache affinity key.
      */
     private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
         Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
         boolean compress,
-        Integer cacheId,
+        Integer grpId,
         GridDhtPartitionFullMap map,
         Object affKey) {
+        assert map != null;
+
         Integer dupDataCache = null;
 
-        if (compress && affKey != null && !m.containsCache(cacheId)) {
+        if (compress && affKey != null && !m.containsGroup(grpId)) {
             T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
 
             if (state0 != null && state0.get2().partitionStateEquals(map)) {
@@ -971,10 +964,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 dupDataCache = state0.get1();
             }
             else
-                dupData.put(affKey, new T2<>(cacheId, map));
+                dupData.put(affKey, new T2<>(grpId, map));
         }
 
-        m.addFullPartitionsMap(cacheId, map, dupDataCache);
+        m.addFullPartitionsMap(grpId, map, dupDataCache);
     }
 
     /**
@@ -1022,24 +1015,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>();
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal()) {
-                GridDhtPartitionMap locMap = cacheCtx.topology().localPartitionMap();
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (!grp.isLocal()) {
+                GridDhtPartitionMap locMap = grp.topology().localPartitionMap();
 
                 addPartitionMap(m,
                     dupData,
                     true,
-                    cacheCtx.cacheId(),
+                    grp.groupId(),
                     locMap,
-                    cacheCtx.affinity().affinityCache().similarAffinityKey());
+                    grp.affinity().similarAffinityKey());
 
                 if (sndCounters)
-                    m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+                    m.partitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
             }
         }
 
         for (GridClientPartitionTopology top : clientTops.values()) {
-            if (m.partitions() != null && m.partitions().containsKey(top.cacheId()))
+            if (m.partitions() != null && m.partitions().containsKey(top.groupId()))
                 continue;
 
             GridDhtPartitionMap locMap = top.localPartitionMap();
@@ -1047,12 +1040,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             addPartitionMap(m,
                 dupData,
                 true,
-                top.cacheId(),
+                top.groupId(),
                 locMap,
                 top.similarAffinityKey());
 
             if (sndCounters)
-                m.partitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+                m.partitionUpdateCounters(top.groupId(), top.updateCounters(true));
         }
 
         return m;
@@ -1250,19 +1243,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 boolean updated = false;
 
                 for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
-                    Integer cacheId = entry.getKey();
+                    Integer grpId = entry.getKey();
 
-                    GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
-                    if (cacheCtx != null && !cacheCtx.started())
-                        continue; // Can safely ignore background exchange.
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
                     GridDhtPartitionTopology top = null;
 
-                    if (cacheCtx == null)
-                        top = clientTops.get(cacheId);
-                    else if (!cacheCtx.isLocal())
-                        top = cacheCtx.topology();
+                    if (grp == null)
+                        top = clientTops.get(grpId);
+                    else if (!grp.isLocal())
+                        top = grp.topology();
 
                     if (top != null)
                         updated |= top.update(null, entry.getValue(), null) != null;
@@ -1296,25 +1286,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 boolean updated = false;
 
                 for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
-                    Integer cacheId = entry.getKey();
+                    Integer grpId = entry.getKey();
 
-                    GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                    if (cacheCtx != null &&
-                        cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
+                    if (grp != null &&
+                        grp.localStartVersion().compareTo(entry.getValue().topologyVersion()) > 0)
                         continue;
 
                     GridDhtPartitionTopology top = null;
 
-                    if (cacheCtx == null)
-                        top = clientTops.get(cacheId);
-                    else if (!cacheCtx.isLocal())
-                        top = cacheCtx.topology();
+                    if (grp == null)
+                        top = clientTops.get(grpId);
+                    else if (!grp.isLocal())
+                        top = grp.topology();
 
                     if (top != null) {
                         updated |= top.update(null, entry.getValue()) != null;
 
-                        cctx.affinity().checkRebalanceState(top, cacheId);
+                        cctx.affinity().checkRebalanceState(top, grpId);
                     }
                 }
 
@@ -1404,8 +1394,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         dumpPendingObjects(exchTopVer);
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts())
-            cacheCtx.preloader().dumpDebugInfo();
+        for (CacheGroupContext grp : cctx.cache().cacheGroups())
+            grp.preloader().dumpDebugInfo();
 
         cctx.affinity().dumpDebugInfo();
 
@@ -1568,21 +1558,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
 
-        for (GridCacheContext ctx : cctx.cacheContexts()) {
-            if (ctx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
-
-            GridCachePreloader preloader = ctx0.preloader();
+            GridCachePreloader preloader = grp.preloader();
 
             if (preloader != null)
                 preloader.dumpDebugInfo();
 
-            GridCacheAffinityManager affMgr = ctx0.affinity();
+            GridAffinityAssignmentCache aff = grp.affinity();
 
-            if (affMgr != null)
-                affMgr.dumpDebugInfo();
+            if (aff != null)
+                aff.dumpDebugInfo();
         }
     }
 
@@ -1725,8 +1713,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 try {
                     boolean preloadFinished = true;
 
-                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                        preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
+                    for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                        if (grp.isLocal())
+                            continue;
+
+                        preloadFinished &= grp.preloader() != null && grp.preloader().syncFuture().isDone();
 
                         if (!preloadFinished)
                             break;
@@ -1833,11 +1824,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             boolean changed = false;
 
-                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                if (cacheCtx.isLocal())
+                            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                                if (grp.isLocal())
                                     continue;
 
-                                changed |= cacheCtx.topology().afterExchange(exchFut);
+                                changed |= grp.topology().afterExchange(exchFut);
                             }
 
                             if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange())
@@ -1857,16 +1848,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         if (!exchFut.skipPreload() && cctx.kernalContext().state().active()) {
                             assignsMap = new HashMap<>();
 
-                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                long delay = cacheCtx.config().getRebalanceDelay();
+                            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                                long delay = grp.config().getRebalanceDelay();
 
                                 GridDhtPreloaderAssignments assigns = null;
 
                                 // Don't delay for dummy reassigns to avoid infinite recursion.
                                 if (delay == 0 || forcePreload)
-                                    assigns = cacheCtx.preloader().assign(exchFut);
+                                    assigns = grp.preloader().assign(exchFut);
 
-                                assignsMap.put(cacheCtx.cacheId(), assigns);
+                                assignsMap.put(grp.groupId(), assigns);
                             }
                         }
                     }
@@ -1881,16 +1872,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
 
                         for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
-                            int cacheId = e.getKey();
+                            int grpId = e.getKey();
 
-                            GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+                            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                            int order = cacheCtx.config().getRebalanceOrder();
+                            int order = grp.config().getRebalanceOrder();
 
                             if (orderMap.get(order) == null)
                                 orderMap.put(order, new ArrayList<Integer>(size));
 
-                            orderMap.get(order).add(cacheId);
+                            orderMap.get(order).add(grpId);
                         }
 
                         Runnable r = null;
@@ -1900,35 +1891,27 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         boolean assignsCancelled = false;
 
                         for (Integer order : orderMap.descendingKeySet()) {
-                            for (Integer cacheId : orderMap.get(order)) {
-                                GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+                            for (Integer grpId : orderMap.get(order)) {
+                                CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                                GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
+                                GridDhtPreloaderAssignments assigns = assignsMap.get(grpId);
 
                                 if (assigns != null)
                                     assignsCancelled |= assigns.cancelled();
 
-                                List<String> waitList = new ArrayList<>(size - 1);
-
-                                for (List<Integer> cIds : orderMap.headMap(order).values()) {
-                                    for (Integer cId : cIds)
-                                        waitList.add(cctx.cacheContext(cId).name());
-                                }
-
                                 // Cancels previous rebalance future (in case it's not done yet).
                                 // Sends previous rebalance stopped event (if necessary).
                                 // Creates new rebalance future.
                                 // Sends current rebalance started event (if necessary).
                                 // Finishes cache sync future (on empty assignments).
-                                Runnable cur = cacheCtx.preloader().addAssignments(assigns,
+                                Runnable cur = grp.preloader().addAssignments(assigns,
                                     forcePreload,
-                                    waitList,
                                     cnt,
                                     r,
                                     exchFut.forcedRebalanceFuture());
 
                                 if (cur != null) {
-                                    rebList.add(U.maskName(cacheCtx.name()));
+                                    rebList.add(grp.cacheOrGroupName());
 
                                     r = cur;
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 5ae68e8..0ac0272 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -45,18 +46,6 @@ public interface GridCachePreloader {
     public void start() throws IgniteCheckedException;
 
     /**
-     * Stops preloading.
-     */
-    public void stop();
-
-    /**
-     * Kernal start callback.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void onKernalStart() throws IgniteCheckedException;
-
-    /**
      * Kernal stop callback.
      */
     public void onKernalStop();
@@ -90,7 +79,6 @@ public interface GridCachePreloader {
      */
     public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
         boolean forcePreload,
-        Collection<String> caches,
         int cnt,
         Runnable next,
         @Nullable GridFutureAdapter<Boolean> forcedRebFut);
@@ -136,20 +124,25 @@ public interface GridCachePreloader {
     /**
      * Requests that preloader sends the request for the key.
      *
+     * @param cctx Cache context.
      * @param keys Keys to request.
      * @param topVer Topology version, {@code -1} if not required.
      * @return Future to complete when all keys are preloaded.
      */
-    public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer);
+    public GridDhtFuture<Object> request(GridCacheContext cctx,
+        Collection<KeyCacheObject> keys,
+        AffinityTopologyVersion topVer);
 
     /**
      * Requests that preloader sends the request for the key.
      *
+     * @param cctx Cache context.
      * @param req Message with keys to request.
      * @param topVer Topology version, {@code -1} if not required.
      * @return Future to complete when all keys are preloaded.
      */
-    public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+    public GridDhtFuture<Object> request(GridCacheContext cctx,
+        GridNearAtomicAbstractUpdateRequest req,
         AffinityTopologyVersion topVer);
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 47c37f5..98874e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -21,9 +21,9 @@ import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -39,15 +39,15 @@ import org.jetbrains.annotations.Nullable;
  * Adapter for preloading which always assumes that preloading finished.
  */
 public class GridCachePreloaderAdapter implements GridCachePreloader {
-    /** Cache context. */
-    protected final GridCacheContext<?, ?> cctx;
+    /** */
+    protected final CacheGroupContext grp;
+
+    /** */
+    protected final GridCacheSharedContext ctx;
 
     /** Logger. */
     protected final IgniteLogger log;
 
-    /** Affinity. */
-    protected final AffinityFunction aff;
-
     /** Start future (always completed by default). */
     private final IgniteInternalFuture finFut;
 
@@ -55,15 +55,16 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     protected IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {
-        assert cctx != null;
+    public GridCachePreloaderAdapter(CacheGroupContext grp) {
+        assert grp != null;
 
-        this.cctx = cctx;
+        this.grp = grp;
 
-        log = cctx.logger(getClass());
-        aff = cctx.config().getAffinity();
+        ctx = grp.shared();
+
+        log = ctx.logger(getClass());
 
         finFut = new GridFinishedFuture();
     }
@@ -74,16 +75,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void stop() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void onKernalStop() {
         // No-op.
     }
@@ -130,7 +121,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        cctx.deploy().unwind(cctx);
+        grp.unwindUndeploys();
     }
 
     /** {@inheritDoc} */
@@ -144,15 +135,15 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys,
+    @Override public GridDhtFuture<Object> request(GridCacheContext ctx, Collection<KeyCacheObject> keys,
         AffinityTopologyVersion topVer) {
-        return new GridFinishedFuture<>();
+        return null;
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+    @Override public GridDhtFuture<Object> request(GridCacheContext ctx, GridNearAtomicAbstractUpdateRequest req,
         AffinityTopologyVersion topVer) {
-        return new GridFinishedFuture<>();
+        return null;
     }
 
     /** {@inheritDoc} */
@@ -168,7 +159,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     /** {@inheritDoc} */
     @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
         boolean forcePreload,
-        Collection<String> caches,
         int cnt,
         Runnable next,
         @Nullable GridFutureAdapter<Boolean> forcedRebFut) {


Mime
View raw message