ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [09/18] ignite git commit: ignite-5075 'logical' caches sharing the same 'physical' cache group
Date Sun, 04 Jun 2017 08:03:08 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ec9dbbf..791bac0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -40,9 +40,9 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.ClusterState;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -68,7 +68,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Partition topology.
  */
-@GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
+@GridToStringExclude
+public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -78,8 +79,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     /** */
     private static final Long ZERO = 0L;
 
-    /** Context. */
-    private final GridCacheContext<?, ?> cctx;
+    /** */
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    private final CacheGroupContext grp;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -114,9 +118,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     /** Lock. */
     private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
 
-    /** */
-    private final GridCacheMapEntryFactory entryFactory;
-
     /** Partition update counter. */
     private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
 
@@ -127,25 +128,27 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     private volatile boolean treatAllPartAsLoc;
 
     /**
-     * @param cctx Context.
-     * @param entryFactory Entry factory.
+     * @param ctx Cache shared context.
+     * @param grp Cache group.
      */
-    GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, GridCacheMapEntryFactory entryFactory) {
-        assert cctx != null;
+    public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx,
+        CacheGroupContext grp) {
+        assert ctx != null;
+        assert grp != null;
 
-        this.cctx = cctx;
-        this.entryFactory = entryFactory;
+        this.ctx = ctx;
+        this.grp = grp;
 
-        log = cctx.logger(getClass());
+        log = ctx.logger(getClass());
 
-        locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions());
+        locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
 
-        part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f);
+        part2node = new HashMap<>(grp.affinityFunction().partitions(), 1.0f);
     }
 
     /** {@inheritDoc} */
-    @Override public int cacheId() {
-        return cctx.cacheId();
+    @Override public int groupId() {
+        return grp.groupId();
     }
 
     /**
@@ -169,7 +172,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             topVer = AffinityTopologyVersion.NONE;
 
-            discoCache = cctx.discovery().discoCache();
+            discoCache = ctx.discovery().discoCache();
         }
         finally {
             lock.writeLock().unlock();
@@ -240,7 +243,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         AffinityTopologyVersion topVer = this.topVer;
 
         assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
-            ", cacheName=" + cctx.name() + ']';
+            ", group=" + grp.cacheOrGroupName() + ']';
 
         return topVer;
     }
@@ -282,7 +285,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
      * @param updateSeq Update sequence.
      */
     private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
@@ -290,23 +293,23 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
         assert topVer.equals(exchFut.topologyVersion()) :
             "Invalid topology [topVer=" + topVer +
-                ", cache=" + cctx.name() +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", futVer=" + exchFut.topologyVersion() +
                 ", fut=" + exchFut + ']';
-        assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) :
-            "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() +
-                ", cache=" + cctx.name() +
+        assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) :
+            "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", futVer=" + exchFut.topologyVersion() +
                 ", fut=" + exchFut + ']';
 
-        List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion());
+        List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
 
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
-        if (cctx.rebalanceEnabled()) {
-            boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom());
+        if (grp.rebalanceEnabled()) {
+            boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
 
-            boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
+            boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined());
 
             if (first) {
                 assert exchId.isJoined() || added;
@@ -317,7 +320,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
                         boolean owned = locPart.own();
 
-                        assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() +
+                        assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() +
                             ", part=" + locPart + ']';
 
                         if (log.isDebugEnabled())
@@ -376,7 +379,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
      * @param updateSeq Update sequence.
      */
     private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
         for (int p = 0; p < num; p++) {
             if (node2part != null && node2part.valid()) {
@@ -398,24 +401,23 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     /** {@inheritDoc} */
     @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
         throws IgniteCheckedException {
-
         DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
         ClusterState newState = exchFut.newClusterState();
 
         treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE)
-            || (cctx.kernalContext().state().active()
+            || (ctx.kernalContext().state().active()
             && discoEvt.type() == EventType.EVT_NODE_JOINED
             && discoEvt.eventNode().isLocal()
-            && !cctx.kernalContext().clientNode()
+            && !ctx.kernalContext().clientNode()
         );
 
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
-        cctx.shared().database().checkpointReadLock();
+        ctx.database().checkpointReadLock();
 
         try {
-            synchronized (cctx.shared().exchange().interruptLock()) {
+            synchronized (ctx.exchange().interruptLock()) {
                 if (Thread.currentThread().isInterrupted())
                     throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
 
@@ -443,7 +445,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     cntrMap.clear();
 
                     // If this is the oldest node.
-                    if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) {
+                    if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) {
                         if (node2part == null) {
                             node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -470,7 +472,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     if (affReady)
                         initPartitions0(exchFut, updateSeq);
                     else {
-                        List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
+                        List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
 
                         createPartitions(aff, updateSeq);
                     }
@@ -487,23 +489,32 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             }
         }
         finally {
-            cctx.shared().database().checkpointReadUnlock();
+            ctx.database().checkpointReadUnlock();
         }
     }
 
+    /**
+     * @param p Partition number.
+     * @param topVer Topology version.
+     * @return {@code True} if given partition belongs to local node.
+     */
+    private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
+        return grp.affinity().nodes(p, topVer).contains(ctx.localNode());
+    }
+
     /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
         treatAllPartAsLoc = false;
 
         boolean changed = false;
 
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
-        assert cctx.affinity().affinityTopologyVersion().equals(topVer) : "Affinity is not initialized " +
+        assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " +
             "[topVer=" + topVer +
-            ", affVer=" + cctx.affinity().affinityTopologyVersion() +
+            ", affVer=" + grp.affinity().lastVersion() +
             ", fut=" + exchFut + ']';
 
         lock.writeLock().lock();
@@ -524,7 +535,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false);
 
-                if (cctx.affinity().partitionLocalNode(p, topVer)) {
+                if (partitionLocalNode(p, topVer)) {
                     // This partition will be created during next topology event,
                     // which obviously has not happened at this point.
                     if (locPart == null) {
@@ -537,26 +548,28 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     GridDhtPartitionState state = locPart.state();
 
                     if (state == MOVING) {
-                        if (cctx.rebalanceEnabled()) {
+                        if (grp.rebalanceEnabled()) {
                             Collection<ClusterNode> owners = owners(p);
 
                             // If there are no other owners, then become an owner.
                             if (F.isEmpty(owners)) {
                                 boolean owned = locPart.own();
 
-                                assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" +
+                                assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + ", locPart=" +
                                     locPart + ']';
 
                                 updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
                                 changed = true;
 
-                                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                                if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
                                     DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
-                                    cctx.events().addPreloadEvent(p,
-                                        EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                                        discoEvt.type(), discoEvt.timestamp());
+                                    grp.addRebalanceEvent(p,
+                                        EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                                        discoEvt.eventNode(),
+                                        discoEvt.type(),
+                                        discoEvt.timestamp());
                                 }
 
                                 if (log.isDebugEnabled())
@@ -574,7 +587,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     if (locPart != null) {
                         GridDhtPartitionState state = locPart.state();
 
-                        if (state == MOVING && cctx.kernalContext().state().active()) {
+                        if (state == MOVING && ctx.kernalContext().state().active()) {
                             locPart.rent(false);
 
                             updateSeq = updateLocal(p, locPart.state(), updateSeq);
@@ -588,7 +601,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 }
             }
 
-            updateRebalanceVersion(cctx.affinity().assignments(topVer));
+            updateRebalanceVersion(grp.affinity().assignments(topVer));
 
             consistencyCheck();
         }
@@ -622,11 +635,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         GridDhtLocalPartition loc = locParts.get(p);
 
         if (loc == null || loc.state() == EVICTED) {
-            locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+            locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
-            if (cctx.shared().pageStore() != null) {
+            if (ctx.pageStore() != null) {
                 try {
-                    cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+                    ctx.pageStore().onPartitionCreated(grp.groupId(), p);
                 }
                 catch (IgniteCheckedException e) {
                     // TODO ignite-db
@@ -672,7 +685,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             state = loc != null ? loc.state() : null;
 
-            boolean belongs = cctx.affinity().partitionLocalNode(p, topVer);
+            boolean belongs = partitionLocalNode(p, topVer);
 
             if (loc != null && state == EVICTED) {
                 locParts.set(p, loc = null);
@@ -692,7 +705,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                         "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
 
-                locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+                locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
                 if (updateSeq)
                     this.updateSeq.incrementAndGet();
@@ -707,9 +720,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             lock.writeLock().unlock();
         }
 
-        if (created && cctx.shared().pageStore() != null) {
+        if (created && ctx.pageStore() != null) {
             try {
-                cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+                // TODO IGNITE-5075.
+                ctx.pageStore().onPartitionCreated(grp.groupId(), p);
             }
             catch (IgniteCheckedException e) {
                 // TODO ignite-db
@@ -734,8 +748,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
-        return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
+    @Override public GridDhtLocalPartition localPartition(int part) {
+        return locParts.get(part);
     }
 
     /** {@inheritDoc} */
@@ -791,7 +805,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 map.put(i, part.state());
             }
 
-            return new GridDhtPartitionMap(cctx.nodeId(),
+            return new GridDhtPartitionMap(ctx.localNodeId(),
                 updateSeq.get(),
                 topVer,
                 Collections.unmodifiableMap(map),
@@ -831,7 +845,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
-        AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+        AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 
@@ -854,8 +868,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer +
                 ", topVer2=" + this.topVer +
-                ", node=" + cctx.igniteInstanceName() +
-                ", cache=" + cctx.name() +
+                ", node=" + ctx.igniteInstanceName() +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", node2part=" + node2part + ']';
 
             List<ClusterNode> nodes = null;
@@ -867,7 +881,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     HashSet<UUID> affIds = affAssignment.getIds(p);
 
                     if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING)) {
-                        ClusterNode n = cctx.discovery().node(nodeId);
+                        ClusterNode n = ctx.discovery().node(nodeId);
 
                         if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
                             if (nodes == null) {
@@ -900,7 +914,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         AffinityTopologyVersion topVer,
         GridDhtPartitionState state,
         GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null;
+        Collection<UUID> allIds = topVer.topologyVersion() > 0 ?
+            F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())) :
+            null;
 
         lock.readLock().lock();
 
@@ -908,7 +924,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
                 ", allIds=" + allIds +
                 ", node2part=" + node2part +
-                ", cache=" + cctx.name() + ']';
+                ", grp=" + grp.cacheOrGroupName() + ']';
 
             Collection<UUID> nodeIds = part2node.get(p);
 
@@ -925,7 +941,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     continue;
 
                 if (hasState(p, id, state, states)) {
-                    ClusterNode n = cctx.discovery().node(id);
+                    ClusterNode n = ctx.discovery().node(id);
 
                     if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
                         nodes.add(n);
@@ -941,7 +957,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
-        if (!cctx.rebalanceEnabled())
+        if (!grp.rebalanceEnabled())
             return ownersAndMoving(p, topVer);
 
         return nodes(p, topVer, OWNING);
@@ -954,7 +970,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> moving(int p) {
-        if (!cctx.rebalanceEnabled())
+        if (!grp.rebalanceEnabled())
             return ownersAndMoving(p, AffinityTopologyVersion.NONE);
 
         return nodes(p, AffinityTopologyVersion.NONE, MOVING);
@@ -979,12 +995,14 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         lock.readLock().lock();
 
         try {
-            assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", cache=" + cctx.name() +
-                ", started=" + cctx.started() +
+            if (node2part == null || stopping)
+                return null;
+
+            assert node2part.valid() : "Invalid node2part [node2part=" + node2part +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", stopping=" + stopping +
-                ", locNodeId=" + cctx.localNode().id() +
-                ", locName=" + cctx.igniteInstanceName() + ']';
+                ", locNodeId=" + ctx.localNode().id() +
+                ", locName=" + ctx.igniteInstanceName() + ']';
 
             GridDhtPartitionFullMap m = node2part;
 
@@ -1067,7 +1085,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     // then we keep the newer value.
                     if (newPart != null &&
                         (newPart.updateSequence() < part.updateSequence() ||
-                        (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+                        (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1081,7 +1099,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
                     UUID nodeId = it.next();
 
-                    if (!cctx.discovery().alive(nodeId)) {
+                    if (!ctx.discovery().alive(nodeId)) {
                         if (log.isDebugEnabled())
                             log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" +
                                 partMap + ']');
@@ -1115,11 +1133,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             boolean changed = false;
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
-            GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId());
+            GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
 
-            if (nodeMap != null && cctx.shared().database().persistenceEnabled()) {
+            if (nodeMap != null && ctx.database().persistenceEnabled()) {
                 for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) {
                     int p = e.getKey();
                     GridDhtPartitionState state = e.getValue();
@@ -1188,7 +1206,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             long updateSeq = this.updateSeq.incrementAndGet();
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+                List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
                 changed |= checkEvictions(updateSeq, aff);
 
@@ -1201,7 +1219,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 log.debug("Partition map after full update: " + fullMapString());
 
             if (changed)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
             return changed ? localPartitionMap() : null;
         }
@@ -1254,7 +1272,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
-        if (!cctx.discovery().alive(parts.nodeId())) {
+        if (!ctx.discovery().alive(parts.nodeId())) {
             if (log.isDebugEnabled())
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
@@ -1334,10 +1352,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 }
             }
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+                List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
                 changed |= checkEvictions(updateSeq, aff);
 
@@ -1350,7 +1368,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 log.debug("Partition map after single update: " + fullMapString());
 
             if (changed)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
             return changed ? localPartitionMap() : null;
         }
@@ -1364,7 +1382,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         lock.writeLock().lock();
 
         try {
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
 
             Collection<Integer> lost = null;
 
@@ -1398,7 +1416,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             boolean changed = false;
 
             if (lost != null) {
-                PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+                PartitionLossPolicy plc = grp.config().getPartitionLossPolicy();
 
                 assert plc != null;
 
@@ -1430,13 +1448,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                         }
                     }
 
-                    if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
-                        cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
-                            discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+                    if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        grp.addRebalanceEvent(part,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                            discoEvt.eventNode(),
+                            discoEvt.type(),
+                            discoEvt.timestamp());
+                    }
                 }
 
                 if (plc != PartitionLossPolicy.IGNORE)
-                    cctx.needsRecovery(true);
+                    grp.needsRecovery(true);
             }
 
             return changed;
@@ -1451,7 +1473,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         lock.writeLock().lock();
 
         try {
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
             long updSeq = updateSeq.incrementAndGet();
 
             for (int part = 0; part < parts; part++) {
@@ -1490,9 +1512,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 }
             }
 
-            checkEvictions(updSeq, cctx.affinity().assignments(topVer));
+            checkEvictions(updSeq, grp.affinity().assignments(topVer));
 
-            cctx.needsRecovery(false);
+            grp.needsRecovery(false);
         }
         finally {
             lock.writeLock().unlock();
@@ -1506,7 +1528,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         try {
             Collection<Integer> res = null;
 
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
 
             for (int part = 0; part < parts; part++) {
                 Set<UUID> nodeIds = part2node.get(part);
@@ -1544,7 +1566,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             GridDhtLocalPartition locPart = locParts.get(p);
 
             if (locPart != null) {
-                if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId())) {
+                if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) {
                     if (haveHistory)
                         locPart.moving();
                     else {
@@ -1552,7 +1574,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
                         locPart.reload(true);
 
-                        result.add(cctx.localNodeId());
+                        result.add(ctx.localNodeId());
                     }
 
                 }
@@ -1588,12 +1610,12 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
      * @return {@code True} if state changed.
      */
     private boolean checkEvictions(long updateSeq) {
-        AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
         boolean changed = false;
 
         if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-            List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+            List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
             changed = checkEvictions(updateSeq, aff);
 
@@ -1625,12 +1647,12 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
      * @return Checks if any of the local partitions need to be evicted.
      */
     private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
-        if (!cctx.kernalContext().state().active())
+        if (!ctx.kernalContext().state().active())
             return false;
 
         boolean changed = false;
 
-        UUID locId = cctx.nodeId();
+        UUID locId = ctx.localNodeId();
 
         for (int p = 0; p < locParts.length(); p++) {
             GridDhtLocalPartition part = locParts.get(p);
@@ -1643,7 +1665,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             if (state.active()) {
                 List<ClusterNode> affNodes = aff.get(p);
 
-                if (!affNodes.contains(cctx.localNode())) {
+                if (!affNodes.contains(ctx.localNode())) {
                     List<ClusterNode> nodes = nodes(p, topVer, OWNING);
                     Collection<UUID> nodeIds = F.nodeIds(nodes);
 
@@ -1710,10 +1732,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
-        assert oldest != null || cctx.kernalContext().clientNode();
+        assert oldest != null || ctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
-        if (cctx.localNode().equals(oldest)) {
+        if (ctx.localNode().equals(oldest)) {
             long seq = node2part.updateSequence();
 
             if (seq != updateSeq) {
@@ -1740,7 +1762,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         }
 
         if (node2part != null) {
-            UUID locNodeId = cctx.localNodeId();
+            UUID locNodeId = ctx.localNodeId();
 
         GridDhtPartitionMap map = node2part.get(locNodeId);
 
@@ -1777,9 +1799,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
         ClusterNode oldest = discoCache.oldestAliveServerNode();
 
-        assert oldest != null || cctx.kernalContext().clientNode();
+        assert oldest != null || ctx.kernalContext().clientNode();
 
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
         if (node2part != null) {
             if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) {
@@ -1927,11 +1949,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", cache=" + cctx.name() +
-                ", started=" + cctx.started() +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", stopping=" + stopping +
-                ", locNodeId=" + cctx.localNode().id() +
-                ", locName=" + cctx.igniteInstanceName() + ']';
+                ", locNodeId=" + ctx.localNodeId() +
+                ", locName=" + ctx.igniteInstanceName() + ']';
 
             for (GridDhtPartitionMap map : node2part.values()) {
                 if (map.hasMovingPartitions())
@@ -1945,10 +1966,25 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         }
     }
 
+    /**
+     * @param cacheId Cache ID.
+     */
+    public void onCacheStopped(int cacheId) {
+        if (!grp.sharedGroup())
+            return;
+
+        for (int i = 0; i < locParts.length(); i++) {
+            GridDhtLocalPartition part = locParts.get(i);
+
+            if (part != null)
+                part.onCacheStopped(cacheId);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
-        X.println(">>>  Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
-            ", cache=" + cctx.name() + ']');
+        X.println(">>>  Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() +
+            ", grp=" + grp.cacheOrGroupName() + ']');
 
         lock.readLock().lock();
 
@@ -1959,7 +1995,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 if (part == null)
                     continue;
 
-                int size = part.dataStore().size();
+                int size = part.dataStore().fullSize();
 
                 if (size >= threshold)
                     X.println(">>>   Local partition [part=" + part.id() + ", size=" + size + ']');
@@ -1976,7 +2012,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
      * @return {@code True} if given partition belongs to local node.
      */
     private boolean localNode(int part, List<List<ClusterNode>> aff) {
-        return aff.get(part).contains(cctx.localNode());
+        return aff.get(part).contains(ctx.localNode());
     }
 
     /**
@@ -1987,7 +2023,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             if (node2part == null || !node2part.valid())
                 return;
 
-            for (int i = 0; i < cctx.affinity().partitions(); i++) {
+            for (int i = 0; i < grp.affinity().partitions(); i++) {
                 List<ClusterNode> affNodes = aff.get(i);
 
                 // Topology doesn't contain server nodes (just clients).
@@ -2003,7 +2039,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             rebalancedTopVer = topVer;
 
             if (log.isDebugEnabled())
-                log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+                log.debug("Updated rebalanced version [cache=" + grp.cacheOrGroupName() + ", ver=" + rebalancedTopVer + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index c91eb7a..d607ff1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -47,7 +47,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
@@ -118,51 +119,61 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        preldr = new GridDhtPreloader(ctx);
-
-        preldr.start();
-
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
             @Override public void apply(UUID nodeId, GridNearGetRequest req) {
                 processNearGetRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
             @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
                 processNearSingleGetRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() {
             @Override public void apply(UUID nodeId, GridNearLockRequest req) {
                 processNearLockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() {
             @Override public void apply(UUID nodeId, GridDhtLockRequest req) {
                 processDhtLockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() {
             @Override public void apply(UUID nodeId, GridDhtLockResponse req) {
                 processDhtLockResponse(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() {
             @Override public void apply(UUID nodeId, GridNearUnlockRequest req) {
                 processNearUnlockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() {
             @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) {
                 processDhtUnlockRequest(nodeId, req);
             }
         });
+
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class,
+            new MessageHandler<GridDhtForceKeysRequest>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
+                    processForceKeysRequest(node, msg);
+                }
+            });
+
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class,
+            new MessageHandler<GridDhtForceKeysResponse>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
+                    processForceKeyResponse(node, msg);
+                }
+            });
     }
 
     /** {@inheritDoc} */
@@ -382,7 +393,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         }
 
         IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
-            ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion());
+            ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion());
 
         if (keyFut == null || keyFut.isDone()) {
             if (keyFut != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index d777a22..6d717eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -173,19 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
         }
 
         switch (writer.state()) {
-            case 7:
+            case 6:
                 if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeMessage("retVal", retVal))
                     return false;
 
@@ -207,7 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
             return false;
 
         switch (reader.state()) {
-            case 7:
+            case 6:
                 checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
 
                 if (!reader.isLastRead())
@@ -215,7 +215,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
@@ -223,7 +223,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 retVal = reader.readMessage("retVal");
 
                 if (!reader.isLastRead())
@@ -243,7 +243,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
index c483408..67eacd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
@@ -47,6 +47,16 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
     /**
      *
      * @param vers Near Tx xid Versions.
@@ -87,7 +97,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
         }
 
         switch (writer.state()) {
-            case 3:
+            case 2:
                 if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG))
                     return false;
 
@@ -109,7 +119,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 2:
                 vers = reader.readCollection("vers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -129,6 +139,6 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 3;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e2b7803..bd238d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1086,7 +1086,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
 
             Collection<KeyCacheObject> keys = entry.getValue();
 
-            lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+            GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+            lastForceFut = ctx.group().preloader().request(ctx, keys, tx.topologyVersion());
 
             if (compFut != null && lastForceFut != null)
                 compFut.add(lastForceFut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6452abc..f29c507 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -458,7 +458,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 GridCacheVersion ver = null;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(key);
+                    CacheDataRow row = cctx.offheap().read(cctx, key);
 
                     if (row != null) {
                         long expireTime = row.expireTime();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index e5df64e..9811f8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -359,7 +359,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 boolean skipEntry = readNoEntry;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(key);
+                    CacheDataRow row = cctx.offheap().read(cctx, key);
 
                     if (row != null) {
                         long expireTime = row.expireTime();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 579796d..d2dc817 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable {
     /** Skip store flag bit mask. */
     private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 0dafa2b..2f99033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -70,7 +70,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -193,19 +194,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
-        return new GridCacheMapEntryFactory() {
-            @Override public GridCacheMapEntry create(
-                GridCacheContext ctx,
-                AffinityTopologyVersion topVer,
-                KeyCacheObject key
-            ) {
-                return new GridDhtAtomicCacheEntry(ctx, topVer, key);
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
     @Override protected void init() {
         super.init();
 
@@ -239,11 +227,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         metrics = m;
 
-        preldr = new GridDhtPreloader(ctx);
-
-        preldr.start();
-
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridNearGetRequest.class,
             new CI2<UUID, GridNearGetRequest>() {
@@ -257,7 +241,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridNearSingleGetRequest.class,
             new CI2<UUID, GridNearSingleGetRequest>() {
@@ -271,7 +255,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridNearAtomicAbstractUpdateRequest.class,
             new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() {
@@ -290,7 +274,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
             GridNearAtomicUpdateResponse.class,
             new CI2<UUID, GridNearAtomicUpdateResponse>() {
                 @Override public void apply(
@@ -308,7 +293,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridDhtAtomicAbstractUpdateRequest.class,
             new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() {
@@ -327,7 +312,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridDhtAtomicUpdateResponse.class,
             new CI2<UUID, GridDhtAtomicUpdateResponse>() {
@@ -346,7 +331,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
             GridDhtAtomicDeferredUpdateResponse.class,
             new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
                 @Override public void apply(
@@ -364,7 +350,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
             GridDhtAtomicNearResponse.class,
             new CI2<UUID, GridDhtAtomicNearResponse>() {
             @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) {
@@ -377,7 +364,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
             GridNearAtomicCheckUpdateRequest.class,
             new CI2<UUID, GridNearAtomicCheckUpdateRequest>() {
                 @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) {
@@ -390,8 +378,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            GridDhtForceKeysRequest.class,
+            new MessageHandler<GridDhtForceKeysRequest>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
+                    processForceKeysRequest(node, msg);
+                }
+            });
+
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            GridDhtForceKeysResponse.class,
+            new MessageHandler<GridDhtForceKeysResponse>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
+                    processForceKeyResponse(node, msg);
+                }
+            });
+
         if (near == null) {
-            ctx.io().addHandler(
+            ctx.io().addCacheHandler(
                 ctx.cacheId(),
                 GridNearGetResponse.class,
                 new CI2<UUID, GridNearGetResponse>() {
@@ -405,7 +411,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
                 });
 
-            ctx.io().addHandler(
+            ctx.io().addCacheHandler(
                 ctx.cacheId(),
                 GridNearSingleGetResponse.class,
                 new CI2<UUID, GridNearSingleGetResponse>() {
@@ -1486,7 +1492,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 // Optimistically expect that all keys are available locally (avoid creation of get future).
                 for (KeyCacheObject key : keys) {
                     if (readNoEntry) {
-                        CacheDataRow row = ctx.offheap().read(key);
+                        CacheDataRow row = ctx.offheap().read(ctx, key);
 
                         if (row != null) {
                             long expireTime = row.expireTime();
@@ -1662,7 +1668,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final GridNearAtomicAbstractUpdateRequest req,
         final UpdateReplyClosure completionCb
     ) {
-        IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
+        IgniteInternalFuture<Object> forceFut = ctx.group().preloader().request(ctx, req, req.topologyVersion());
 
         if (forceFut == null || forceFut.isDone()) {
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
deleted file mode 100644
index b0c9a64..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * DHT atomic cache entry.
- */
-public class GridDhtAtomicCacheEntry extends GridDhtCacheEntry {
-    /**
-     * @param ctx Cache context.
-     * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
-     * @param key Cache key.
-     */
-    GridDhtAtomicCacheEntry(
-        GridCacheContext ctx,
-        AffinityTopologyVersion topVer,
-        KeyCacheObject key
-    ) {
-        super(ctx, topVer, key);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected String cacheName() {
-        return CU.isNearEnabled(cctx) ? super.cacheName() : cctx.dht().name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized String toString() {
-        return S.toString(GridDhtAtomicCacheEntry.class, this, super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 92ef149..0c069da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridLongList;
@@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Deferred dht atomic update response.
  */
-public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtAtomicDeferredUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index d6e2db0..71d2321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -36,7 +36,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic
 /**
  * Message sent from DHT nodes to near node in FULL_SYNC mode.
  */
-public class GridDhtAtomicNearResponse extends GridCacheMessage {
+public class GridDhtAtomicNearResponse extends GridCacheIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 693d658..7b2547a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
@@ -27,7 +26,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -39,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * DHT atomic cache backup update response.
  */
-public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 4b3ea5bc..bb47af4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable {
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
index 4b9109e..96be023 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  *
  */
-public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage {
+public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 55953ea..5ba024f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -45,7 +45,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * DHT atomic cache near update response.
  */
-public class GridNearAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 12a3912..708df49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -116,35 +115,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
-        return new GridCacheMapEntryFactory() {
-            @Override public GridCacheMapEntry create(
-                GridCacheContext ctx,
-                AffinityTopologyVersion topVer,
-                KeyCacheObject key
-            ) {
-                return new GridDhtColocatedCacheEntry(ctx, topVer, key);
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processNearGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
                 processNearSingleGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() {
             @Override public void apply(UUID nodeId, GridNearLockResponse res) {
                 processLockResponse(nodeId, res);
             }
@@ -467,7 +453,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                 for (KeyCacheObject key : keys) {
                     if (readNoEntry) {
-                        CacheDataRow row = ctx.offheap().read(key);
+                        CacheDataRow row = ctx.offheap().read(ctx, key);
 
                         if (row != null) {
                             long expireTime = row.expireTime();
@@ -941,7 +927,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     ) {
         assert keys != null;
 
-        IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
+        IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer);
 
         // Prevent embedded future creation if possible.
         if (keyFut == null || keyFut.isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
deleted file mode 100644
index f7cc5a7..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
-
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Cache entry for colocated cache.
- */
-public class GridDhtColocatedCacheEntry extends GridDhtCacheEntry {
-    /**
-     * @param ctx Cache context.
-     * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
-     * @param key Cache key.
-     */
-    GridDhtColocatedCacheEntry(
-        GridCacheContext ctx,
-        AffinityTopologyVersion topVer,
-        KeyCacheObject key
-    ) {
-        super(ctx, topVer, key);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected String cacheName() {
-        return cctx.colocated().name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized String toString() {
-        return S.toString(GridDhtColocatedCacheEntry.class, this, super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index d72d8db..763b43b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -102,9 +102,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     /** Future ID. */
     private IgniteUuid futId = IgniteUuid.randomUuid();
 
-    /** Preloader. */
-    private GridDhtPreloader preloader;
-
     /** Trackable flag. */
     private boolean trackable;
 
@@ -112,21 +109,19 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param cctx Cache context.
      * @param topVer Topology version.
      * @param keys Keys.
-     * @param preloader Preloader.
      */
     public GridDhtForceKeysFuture(
         GridCacheContext<K, V> cctx,
         AffinityTopologyVersion topVer,
-        Collection<KeyCacheObject> keys,
-        GridDhtPreloader preloader
+        Collection<KeyCacheObject> keys
     ) {
         assert topVer.topologyVersion() != 0 : topVer;
         assert !F.isEmpty(keys) : keys;
+        assert !cctx.isNear();
 
         this.cctx = cctx;
         this.keys = keys;
         this.topVer = topVer;
-        this.preloader = preloader;
 
         top = cctx.dht().topology();
 
@@ -158,7 +153,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     @Override public boolean onDone(@Nullable Collection<K> res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
             if (trackable)
-                preloader.remoteFuture(this);
+                cctx.dht().removeFuture(this);
 
             return true;
         }
@@ -170,7 +165,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param evt Discovery event.
      */
     @SuppressWarnings( {"unchecked"})
-    void onDiscoveryEvent(DiscoveryEvent evt) {
+    public void onDiscoveryEvent(DiscoveryEvent evt) {
         topCntr.incrementAndGet();
 
         int type = evt.type();
@@ -244,7 +239,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
 
             int curTopVer = topCntr.get();
 
-            if (!preloader.addFuture(this)) {
+            if (!cctx.dht().addFuture(this)) {
                 assert isDone() : this;
 
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
index d129ae8..124ae44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -40,7 +40,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  * Force keys request. This message is sent by node while preloading to force
  * another node to put given keys into the next batch of transmitting entries.
  */
-public class GridDhtForceKeysRequest extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtForceKeysRequest extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 


Mime
View raw message