ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [21/31] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches
Date Tue, 06 Jun 2017 10:01:42 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 6fb557a..cba9477 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -19,7 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -30,30 +33,33 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
@@ -70,6 +76,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  * Key partition.
  */
 public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable {
+    /** */
+    private static final GridCacheMapEntryFactory ENTRY_FACTORY = new GridCacheMapEntryFactory() {
+        @Override public GridCacheMapEntry create(
+            GridCacheContext ctx,
+            AffinityTopologyVersion topVer,
+            KeyCacheObject key
+        ) {
+            return new GridDhtCacheEntry(ctx, topVer, key);
+        }
+    };
+
     /** Maximum size for delete queue. */
     public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000);
 
@@ -100,29 +117,48 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     @GridToStringExclude
     private final GridFutureAdapter<?> rent;
 
-    /** Context. */
-    private final GridCacheContext cctx;
+    /** */
+    @GridToStringExclude
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    @GridToStringExclude
+    private final CacheGroupContext grp;
 
     /** Create time. */
     @GridToStringExclude
     private final long createTime = U.currentTimeMillis();
 
     /** Eviction history. */
+    @GridToStringExclude
     private volatile Map<KeyCacheObject, GridCacheVersion> evictHist = new HashMap<>();
 
     /** Lock. */
+    @GridToStringExclude
     private final ReentrantLock lock = new ReentrantLock();
 
+    /** */
+    @GridToStringExclude
+    private final ConcurrentMap<Integer, CacheMapHolder> cacheMaps;
+
+    /** */
+    @GridToStringExclude
+    private final CacheMapHolder singleCacheEntryMap;
+
     /** Remove queue. */
+    @GridToStringExclude
     private final ConcurrentLinkedDeque8<RemovedEntryHolder> rmvQueue = new ConcurrentLinkedDeque8<>();
 
     /** Group reservations. */
+    @GridToStringExclude
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
 
     /** */
+    @GridToStringExclude
     private final CacheDataStore store;
 
     /** Partition updates. */
+    @GridToStringExclude
     private final ConcurrentNavigableMap<Long, Boolean> updates = new ConcurrentSkipListMap<>();
 
     /** Last applied update. */
@@ -133,18 +169,30 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     private volatile boolean shouldBeRenting;
 
     /**
-     * @param cctx Context.
+     * @param ctx Context.
+     * @param grp Cache group.
      * @param id Partition ID.
-     * @param entryFactory Entry factory.
      */
     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
-    GridDhtLocalPartition(GridCacheContext cctx, int id, GridCacheMapEntryFactory entryFactory) {
-        super(cctx, entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / cctx.affinity().partitions()));
+    GridDhtLocalPartition(GridCacheSharedContext ctx,
+        CacheGroupContext grp,
+        int id) {
+        super(ENTRY_FACTORY);
 
         this.id = id;
-        this.cctx = cctx;
+        this.ctx = ctx;
+        this.grp = grp;
+
+        log = U.logger(ctx.kernalContext(), logRef, this);
 
-        log = U.logger(cctx.kernalContext(), logRef, this);
+        if (grp.sharedGroup()) {
+            singleCacheEntryMap = null;
+            cacheMaps = new ConcurrentHashMap<>();
+        }
+        else {
+            singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(), createEntriesMap());
+            cacheMaps = null;
+        }
 
         rent = new GridFutureAdapter<Object>() {
             @Override public String toString() {
@@ -152,15 +200,15 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
             }
         };
 
-        int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
-            Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
+        int delQueueSize = grp.systemCache() ? 100 :
+            Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
 
         rmvQueueMaxSize = U.ceilPow2(delQueueSize);
 
         rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
 
         try {
-            store = cctx.offheap().createCacheDataStore(id);
+            store = grp.offheap().createCacheDataStore(id);
         }
         catch (IgniteCheckedException e) {
             // TODO ignite-db
@@ -169,6 +217,62 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @return Entries map.
+     */
+    private ConcurrentMap<KeyCacheObject, GridCacheMapEntry> createEntriesMap() {
+        return new ConcurrentHashMap8<>(Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()),
+            0.75f,
+            Runtime.getRuntime().availableProcessors() * 2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int internalSize() {
+        if (grp.sharedGroup()) {
+            int size = 0;
+
+            for (CacheMapHolder hld : cacheMaps.values())
+                size += hld.map.size();
+
+            return size;
+        }
+
+        return singleCacheEntryMap.map.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) {
+        if (grp.sharedGroup())
+            return cacheMapHolder(cctx);
+
+        return singleCacheEntryMap;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) {
+        return grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @return Map holder.
+     */
+    private CacheMapHolder cacheMapHolder(GridCacheContext cctx) {
+        assert grp.sharedGroup();
+
+        CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed());
+
+        if (hld != null)
+            return hld;
+
+        CacheMapHolder  old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = new CacheMapHolder(cctx, createEntriesMap()));
+
+        if (old != null)
+            hld = old;
+
+        return hld;
+    }
+
+    /**
      * @return Data store.
      */
     public CacheDataStore dataStore() {
@@ -235,10 +339,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @return {@code True} if partition is empty.
      */
     public boolean isEmpty() {
-        if (cctx.allowFastEviction())
+        if (grp.allowFastEviction())
             return internalSize() == 0;
 
-        return store.size() == 0 && internalSize() == 0;
+        return store.fullSize() == 0 && internalSize() == 0;
     }
 
     /**
@@ -307,6 +411,20 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param ver Version.
+     */
+    private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) {
+        CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap;
+
+        GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null;
+
+        if (entry != null && entry.markObsoleteVersion(ver))
+            removeEntry(entry);
+    }
+
+    /**
      *
      */
     public void cleanupRemoveQueue() {
@@ -314,10 +432,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
             RemovedEntryHolder item = rmvQueue.pollFirst();
 
             if (item != null)
-                cctx.dht().removeVersionedEntry(item.key(), item.version());
+                removeVersionedEntry(item.cacheId(), item.key(), item.version());
         }
 
-        if (!cctx.isDrEnabled()) {
+        if (!grp.isDrEnabled()) {
             RemovedEntryHolder item = rmvQueue.peekFirst();
 
             while (item != null && item.expireTime() < U.currentTimeMillis()) {
@@ -326,7 +444,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                 if (item == null)
                     break;
 
-                cctx.dht().removeVersionedEntry(item.key(), item.version());
+                removeVersionedEntry(item.cacheId(), item.key(), item.version());
 
                 item = rmvQueue.peekFirst();
             }
@@ -334,13 +452,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @param cacheId cacheId Cache ID.
      * @param key Removed key.
      * @param ver Removed version.
      */
-    public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) {
+    public void onDeferredDelete(int cacheId, KeyCacheObject key, GridCacheVersion ver) {
         cleanupRemoveQueue();
 
-        rmvQueue.add(new RemovedEntryHolder(key, ver, rmvdEntryTtl));
+        rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl));
     }
 
     /**
@@ -438,7 +557,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override protected void release(int sizeChange, GridCacheEntryEx e) {
+    @Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) {
+        if (grp.sharedGroup() && sizeChange != 0)
+            hld.size.addAndGet(sizeChange);
+
         release0(sizeChange);
     }
 
@@ -486,16 +608,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @return {@code true} if cas succeeds.
      */
     private boolean casState(long state, GridDhtPartitionState toState) {
-        if (cctx.shared().database().persistenceEnabled()) {
+        if (ctx.database().persistenceEnabled()) {
             synchronized (this) {
                 boolean update = this.state.compareAndSet(state, setPartState(state, toState));
 
                 if (update)
                     try {
-                        cctx.shared().wal().log(new PartitionMetaStateRecord(cctx.cacheId(), id, toState, updateCounter()));
+                        ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter()));
                     }
                     catch (IgniteCheckedException e) {
-                        log.error("Error while writing to log", e);
+                        U.error(log, "Error while writing to log", e);
                     }
 
                 return update;
@@ -610,13 +732,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @param updateSeq Update sequence.
      */
     void tryEvictAsync(boolean updateSeq) {
-        assert cctx.kernalContext().state().active();
+        assert ctx.kernalContext().state().active();
 
         long state = this.state.get();
 
         GridDhtPartitionState partState = getPartState(state);
 
-        if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && getSize(state) == 0 &&
+        if (isEmpty() && !grp.queriesEnabled() && getSize(state) == 0 &&
             partState == RENTING && getReservations(state) == 0 && !groupReserved() &&
             casState(state, EVICTED)) {
             if (log.isDebugEnabled())
@@ -626,7 +748,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                 finishDestroy(updateSeq);
         }
         else if (partState == RENTING || shouldBeRenting())
-            cctx.preloader().evictPartitionAsync(this);
+            grp.preloader().evictPartitionAsync(this);
     }
 
     /**
@@ -702,18 +824,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
         assert state() == EVICTED : this;
         assert evictGuard.get() == -1;
 
-        if (cctx.isDrEnabled())
-            cctx.dr().partitionEvicted(id);
-
-        cctx.continuousQueries().onPartitionEvicted(id);
-
-        cctx.dataStructures().onPartitionEvicted(id);
+        grp.onPartitionEvicted(id);
 
         destroyCacheDataStore();
 
         rent.onDone();
 
-        ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
+        ((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, updateSeq);
 
         clearDeferredDeletes();
     }
@@ -753,7 +870,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
         try {
             CacheDataStore store = dataStore();
 
-            cctx.offheap().destroyCacheDataStore(id, store);
+            grp.offheap().destroyCacheDataStore(id, store);
         }
         catch (IgniteCheckedException e) {
             log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e);
@@ -777,7 +894,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @return {@code True} if local node is primary for this partition.
      */
     public boolean primary(AffinityTopologyVersion topVer) {
-        return cctx.affinity().primaryByPartition(cctx.localNode(), id, topVer);
+        List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
+
+        return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0));
     }
 
     /**
@@ -785,14 +904,23 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @return {@code True} if local node is backup for this partition.
      */
     public boolean backup(AffinityTopologyVersion topVer) {
-        return cctx.affinity().backupByPartition(cctx.localNode(), id, topVer);
+        List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
+
+        return nodes.indexOf(ctx.localNode()) > 0;
     }
 
     /**
+     * @param cacheId ID of cache initiated counter update.
+     * @param topVer Topology version for current operation.
      * @return Next update index.
      */
-    public long nextUpdateCounter() {
-        return store.nextUpdateCounter();
+    long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) {
+        long nextCntr = store.nextUpdateCounter();
+
+        if (grp.sharedGroup())
+            grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? primaryCntr : nextCntr, topVer, primary);
+
+        return nextCntr;
     }
 
     /**
@@ -829,86 +957,53 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @throws NodeStoppingException If node stopping.
      */
     public void clearAll() throws NodeStoppingException {
-        GridCacheVersion clearVer = cctx.versions().next();
-
-        boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
-
-        Iterator<GridCacheMapEntry> it = allEntries().iterator();
+        GridCacheVersion clearVer = ctx.versions().next();
 
         GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
 
-        while (it.hasNext()) {
-            GridCacheMapEntry cached = null;
-
-            cctx.shared().database().checkpointReadLock();
-
-            try {
-                cached = it.next();
-
-                if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
-                    removeEntry(cached);
-
-                    if (!cached.isInternal()) {
-                        if (rec) {
-                            cctx.events().addEvent(cached.partition(),
-                                cached.key(),
-                                cctx.localNodeId(),
-                                (IgniteUuid)null,
-                                null,
-                                EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
-                                null,
-                                false,
-                                cached.rawGet(),
-                                cached.hasValue(),
-                                null,
-                                null,
-                                null,
-                                false);
-                        }
-                    }
-                }
-            }
-            catch (GridDhtInvalidPartitionException e) {
-                assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
-
-                break; // Partition is already concurrently cleared and evicted.
-            }
-            catch (NodeStoppingException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to clear cache entry for evicted partition: " + cached.partition());
-
-                rent.onDone(e);
+        boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
 
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e);
-            }
-            finally {
-                cctx.shared().database().checkpointReadUnlock();
-            }
+        if (grp.sharedGroup()) {
+            for (CacheMapHolder hld : cacheMaps.values())
+                clear(hld.map, extras, rec);
         }
+        else
+            clear(singleCacheEntryMap.map, extras, rec);
+
+        if (!grp.allowFastEviction()) {
+            CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
 
-        if (!cctx.allowFastEviction()) {
             try {
-                GridIterator<CacheDataRow> it0 = cctx.offheap().iterator(id);
+                GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
 
                 while (it0.hasNext()) {
-                    cctx.shared().database().checkpointReadLock();
+                    ctx.database().checkpointReadLock();
 
                     try {
                         CacheDataRow row = it0.next();
 
-                        GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(),
+                        if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) {
+                            hld = cacheMaps.get(row.cacheId());
+
+                            if (hld == null)
+                                continue;
+                        }
+
+                        assert hld != null;
+
+                        GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
+                            hld,
+                            hld.cctx,
+                            grp.affinity().lastVersion(),
                             row.key(),
                             true,
                             false);
 
                         if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
                             if (rec) {
-                                cctx.events().addEvent(cached.partition(),
+                                hld.cctx.events().addEvent(cached.partition(),
                                     cached.key(),
-                                    cctx.localNodeId(),
+                                    ctx.localNodeId(),
                                     (IgniteUuid)null,
                                     null,
                                     EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
@@ -929,7 +1024,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                         break; // Partition is already concurrently cleared and evicted.
                     }
                     finally {
-                        cctx.shared().database().checkpointReadUnlock();
+                        ctx.database().checkpointReadUnlock();
                     }
                 }
             }
@@ -948,11 +1043,70 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @param map Map to clear.
+     * @param extras Obsolete extras.
+     * @param evt Unload event flag.
+     * @throws NodeStoppingException
+     */
+    private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
+        GridCacheObsoleteEntryExtras extras,
+        boolean evt) throws NodeStoppingException {
+        Iterator<GridCacheMapEntry> it = map.values().iterator();
+
+        while (it.hasNext()) {
+            GridCacheMapEntry cached = null;
+
+            ctx.database().checkpointReadLock();
+
+            try {
+                cached = it.next();
+
+                if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) {
+                    removeEntry(cached);
+
+                    if (!cached.isInternal()) {
+                        if (evt) {
+                            grp.addCacheEvent(cached.partition(),
+                                cached.key(),
+                                ctx.localNodeId(),
+                                EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                null,
+                                false,
+                                cached.rawGet(),
+                                cached.hasValue(),
+                                false);
+                        }
+                    }
+                }
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
+
+                break; // Partition is already concurrently cleared and evicted.
+            }
+            catch (NodeStoppingException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to clear cache entry for evicted partition: " + cached.partition());
+
+                rent.onDone(e);
+
+                throw e;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e);
+            }
+            finally {
+                ctx.database().checkpointReadUnlock();
+            }
+        }
+    }
+
+    /**
      *
      */
     private void clearDeferredDeletes() {
         for (RemovedEntryHolder e : rmvQueue)
-            cctx.dht().removeVersionedEntry(e.key(), e.version());
+            removeVersionedEntry(e.cacheId(), e.key(), e.version());
     }
 
     /** {@inheritDoc} */
@@ -977,6 +1131,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtLocalPartition.class, this,
+            "grp", grp.cacheOrGroupName(),
             "state", state(),
             "reservations", reservations(),
             "empty", isEmpty(),
@@ -984,12 +1139,25 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override public int publicSize() {
+    @Override public int publicSize(int cacheId) {
+        if (grp.sharedGroup()) {
+            CacheMapHolder hld = cacheMaps.get(cacheId);
+
+            return hld != null ? hld.size.get() : 0;
+        }
+
         return getSize(state.get());
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+    @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) {
+        if (grp.sharedGroup()) {
+            if (hld == null)
+                hld = cacheMapHolder(e.context());
+
+            hld.size.incrementAndGet();
+        }
+
         while (true) {
             long state = this.state.get();
 
@@ -999,7 +1167,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+    @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) {
+        if (grp.sharedGroup()) {
+            if (hld == null)
+                hld = cacheMapHolder(e.context());
+
+            hld.size.decrementAndGet();
+        }
+
         while (true) {
             long state = this.state.get();
 
@@ -1011,6 +1186,22 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @param cacheId Cache ID.
+     */
+    void onCacheStopped(int cacheId) {
+        assert grp.sharedGroup() : grp.cacheOrGroupName();
+
+        for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); it.hasNext();) {
+            RemovedEntryHolder e = it.next();
+
+            if (e.cacheId() == cacheId)
+                it.remove();
+        }
+
+        cacheMaps.remove(cacheId);
+    }
+
+    /**
      * @param state Composite state.
      * @return Partition state.
      */
@@ -1065,6 +1256,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * Removed entry holder.
      */
     private static class RemovedEntryHolder {
+        /** */
+        private final int cacheId;
+
         /** Cache key */
         private final KeyCacheObject key;
 
@@ -1075,11 +1269,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
         private final long expireTime;
 
         /**
+         * @param cacheId Cache ID.
          * @param key Key.
          * @param ver Entry version.
          * @param ttl TTL.
          */
-        private RemovedEntryHolder(KeyCacheObject key, GridCacheVersion ver, long ttl) {
+        private RemovedEntryHolder(int cacheId, KeyCacheObject key, GridCacheVersion ver, long ttl) {
+            this.cacheId = cacheId;
             this.key = key;
             this.ver = ver;
 
@@ -1087,6 +1283,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
         }
 
         /**
+         * @return Cache ID.
+         */
+        int cacheId() {
+            return cacheId;
+        }
+
+        /**
          * @return Key.
          */
         KeyCacheObject key() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
index ea6ca06..87abd6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
@@ -174,7 +174,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
         }
 
         if (preloadEntries != null)
-            marshalInfos(preloadEntries, cctx);
+            marshalInfos(preloadEntries, cctx.shared(), cctx.cacheObjectContext());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index ffc1d63..d365a8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -88,9 +88,9 @@ public interface GridDhtPartitionTopology {
     public boolean stopping();
 
     /**
-     * @return Cache ID.
+     * @return Cache group ID.
      */
-    public int cacheId();
+    public int groupId();
 
     /**
      * Pre-initializes this topology.
@@ -134,13 +134,12 @@ public interface GridDhtPartitionTopology {
     public void releasePartitions(int... parts);
 
     /**
-     * @param key Cache key.
-     * @param create If {@code true}, then partition will be created if it's not there.
+     * @param part Partition number.
      * @return Local partition.
      * @throws GridDhtInvalidPartitionException If partition is evicted or absent and
      *      does not belong to this node.
      */
-    @Nullable public GridDhtLocalPartition localPartition(Object key, boolean create)
+    @Nullable public GridDhtLocalPartition localPartition(int part)
         throws GridDhtInvalidPartitionException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 7adce6e..248d44e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -42,9 +42,9 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.ClusterState;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -72,7 +72,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  * Partition topology.
  */
 @GridToStringExclude
-class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
+public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -82,8 +82,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** */
     private static final Long ZERO = 0L;
 
-    /** Context. */
-    private final GridCacheContext<?, ?> cctx;
+    /** */
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    private final CacheGroupContext grp;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -118,9 +121,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** Lock. */
     private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
 
-    /** */
-    private final GridCacheMapEntryFactory entryFactory;
-
     /** Partition update counter. */
     private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
 
@@ -131,23 +131,25 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private volatile boolean treatAllPartAsLoc;
 
     /**
-     * @param cctx Context.
-     * @param entryFactory Entry factory.
+     * @param ctx Cache shared context.
+     * @param grp Cache group.
      */
-    GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, GridCacheMapEntryFactory entryFactory) {
-        assert cctx != null;
+    public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx,
+        CacheGroupContext grp) {
+        assert ctx != null;
+        assert grp != null;
 
-        this.cctx = cctx;
-        this.entryFactory = entryFactory;
+        this.ctx = ctx;
+        this.grp = grp;
 
-        log = cctx.logger(getClass());
+        log = ctx.logger(getClass());
 
-        locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions());
+        locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
     }
 
     /** {@inheritDoc} */
-    @Override public int cacheId() {
-        return cctx.cacheId();
+    @Override public int groupId() {
+        return grp.groupId();
     }
 
     /**
@@ -171,7 +173,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             topVer = AffinityTopologyVersion.NONE;
 
-            discoCache = cctx.discovery().discoCache();
+            discoCache = ctx.discovery().discoCache();
         }
         finally {
             lock.writeLock().unlock();
@@ -235,13 +237,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                             if (dumpCnt++ < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
                                 U.warn(log, "Failed to wait for partition eviction [" +
                                     "topVer=" + topVer +
-                                    ", cache=" + cctx.name() +
+                                    ", group=" + grp.cacheOrGroupName() +
                                     ", part=" + part.id() +
                                     ", partState=" + part.state() +
                                     ", size=" + part.internalSize() +
                                     ", reservations=" + part.reservations() +
                                     ", grpReservations=" + part.groupReserved() +
-                                    ", node=" + cctx.localNodeId() + "]");
+                                    ", node=" + ctx.localNodeId() + "]");
 
                                 if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
                                     U.dumpThreads(log);
@@ -329,7 +331,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         AffinityTopologyVersion topVer = this.topVer;
 
         assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
-            ", cacheName=" + cctx.name() + ']';
+            ", group=" + grp.cacheOrGroupName() + ']';
 
         return topVer;
     }
@@ -371,7 +373,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param updateSeq Update sequence.
      */
     private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
@@ -379,23 +381,23 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         assert topVer.equals(exchFut.topologyVersion()) :
             "Invalid topology [topVer=" + topVer +
-                ", cache=" + cctx.name() +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", futVer=" + exchFut.topologyVersion() +
                 ", fut=" + exchFut + ']';
-        assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) :
-            "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() +
-                ", cache=" + cctx.name() +
+        assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) :
+            "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", futVer=" + exchFut.topologyVersion() +
                 ", fut=" + exchFut + ']';
 
-        List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion());
+        List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
 
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
-        if (cctx.rebalanceEnabled()) {
-            boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom());
+        if (grp.rebalanceEnabled()) {
+            boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
 
-            boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
+            boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined());
 
             if (first) {
                 assert exchId.isJoined() || added;
@@ -406,7 +408,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                         boolean owned = locPart.own();
 
-                        assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() +
+                        assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() +
                             ", part=" + locPart + ']';
 
                         if (log.isDebugEnabled())
@@ -465,7 +467,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param updateSeq Update sequence.
      */
     private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
         for (int p = 0; p < num; p++) {
             if (node2part != null && node2part.valid()) {
@@ -487,26 +489,25 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
         throws IgniteCheckedException {
-
         DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
         ClusterState newState = exchFut.newClusterState();
 
         treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE)
-            || (cctx.kernalContext().state().active()
+            || (ctx.kernalContext().state().active()
             && discoEvt.type() == EventType.EVT_NODE_JOINED
             && discoEvt.eventNode().isLocal()
-            && !cctx.kernalContext().clientNode()
+            && !ctx.kernalContext().clientNode()
         );
 
         // Wait for rent outside of checkpoint lock.
         waitForRent();
 
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
-        cctx.shared().database().checkpointReadLock();
+        ctx.database().checkpointReadLock();
 
-        synchronized (cctx.shared().exchange().interruptLock()) {
+        synchronized (ctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
 
@@ -514,7 +515,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 U.writeLock(lock);
             }
             catch (IgniteInterruptedCheckedException e) {
-                cctx.shared().database().checkpointReadUnlock();
+                ctx.database().checkpointReadUnlock();
 
                 throw e;
             }
@@ -541,7 +542,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 cntrMap.clear();
 
                 // If this is the oldest node.
-                if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) {
+                if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) {
                     if (node2part == null) {
                         node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -568,7 +569,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (affReady)
                     initPartitions0(exchFut, updateSeq);
                 else {
-                    List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
+                    List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
 
                     createPartitions(aff, updateSeq);
                 }
@@ -582,7 +583,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             finally {
                 lock.writeLock().unlock();
 
-                cctx.shared().database().checkpointReadUnlock();
+                ctx.database().checkpointReadUnlock();
             }
         }
 
@@ -590,19 +591,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         waitForRent();
     }
 
+    /**
+     * @param p Partition number.
+     * @param topVer Topology version.
+     * @return {@code True} if given partition belongs to local node.
+     */
+    private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
+        return grp.affinity().nodes(p, topVer).contains(ctx.localNode());
+    }
+
     /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
         treatAllPartAsLoc = false;
 
         boolean changed = waitForRent();
 
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
-        assert cctx.affinity().affinityTopologyVersion().equals(topVer) : "Affinity is not initialized " +
+        assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " +
             "[topVer=" + topVer +
-            ", affVer=" + cctx.affinity().affinityTopologyVersion() +
+            ", affVer=" + grp.affinity().lastVersion() +
             ", fut=" + exchFut + ']';
 
         lock.writeLock().lock();
@@ -623,7 +633,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
 
-                if (cctx.affinity().partitionLocalNode(p, topVer)) {
+                if (partitionLocalNode(p, topVer)) {
                     // This partition will be created during next topology event,
                     // which obviously has not happened at this point.
                     if (locPart == null) {
@@ -636,26 +646,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     GridDhtPartitionState state = locPart.state();
 
                     if (state == MOVING) {
-                        if (cctx.rebalanceEnabled()) {
+                        if (grp.rebalanceEnabled()) {
                             Collection<ClusterNode> owners = owners(p);
 
                             // If there are no other owners, then become an owner.
                             if (F.isEmpty(owners)) {
                                 boolean owned = locPart.own();
 
-                                assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" +
+                                assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + ", locPart=" +
                                     locPart + ']';
 
                                 updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
                                 changed = true;
 
-                                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                                if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
                                     DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
-                                    cctx.events().addPreloadEvent(p,
-                                        EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                                        discoEvt.type(), discoEvt.timestamp());
+                                    grp.addRebalanceEvent(p,
+                                        EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                                        discoEvt.eventNode(),
+                                        discoEvt.type(),
+                                        discoEvt.timestamp());
                                 }
 
                                 if (log.isDebugEnabled())
@@ -673,7 +685,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (locPart != null) {
                         GridDhtPartitionState state = locPart.state();
 
-                        if (state == MOVING && cctx.kernalContext().state().active()) {
+                        if (state == MOVING && ctx.kernalContext().state().active()) {
                             locPart.rent(false);
 
                             updateSeq = updateLocal(p, locPart.state(), updateSeq);
@@ -687,7 +699,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            updateRebalanceVersion(cctx.affinity().assignments(topVer));
+            updateRebalanceVersion(grp.affinity().assignments(topVer));
 
             consistencyCheck();
         }
@@ -715,11 +727,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         GridDhtLocalPartition loc = locParts.get(p);
 
         if (loc == null || loc.state() == EVICTED) {
-            locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+            locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
-            if (cctx.shared().pageStore() != null) {
+            if (ctx.pageStore() != null) {
                 try {
-                    cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+                    ctx.pageStore().onPartitionCreated(grp.groupId(), p);
                 }
                 catch (IgniteCheckedException e) {
                     // TODO ignite-db
@@ -749,7 +761,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         GridDhtPartitionState state = loc != null ? loc.state() : null;
 
-        if (loc != null && state != EVICTED && (state != RENTING || !cctx.allowFastEviction()))
+        if (loc != null && state != EVICTED && (state != RENTING || !grp.allowFastEviction()))
             return loc;
 
         if (!create)
@@ -764,7 +776,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             state = loc != null ? loc.state() : null;
 
-            boolean belongs = cctx.affinity().partitionLocalNode(p, topVer);
+            boolean belongs = partitionLocalNode(p, topVer);
 
             if (loc != null && state == EVICTED) {
                 locParts.set(p, loc = null);
@@ -774,7 +786,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         "(often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
             }
-            else if (loc != null && state == RENTING && cctx.allowFastEviction())
+            else if (loc != null && state == RENTING && grp.allowFastEviction())
                 throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted.");
 
             if (loc == null) {
@@ -783,7 +795,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
 
-                locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+                locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
                 if (updateSeq)
                     this.updateSeq.incrementAndGet();
@@ -798,9 +810,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             lock.writeLock().unlock();
         }
 
-        if (created && cctx.shared().pageStore() != null) {
+        if (created && ctx.pageStore() != null) {
             try {
-                cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+                ctx.pageStore().onPartitionCreated(grp.groupId(), p);
             }
             catch (IgniteCheckedException e) {
                 // TODO ignite-db
@@ -825,8 +837,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
-        return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
+    @Override public GridDhtLocalPartition localPartition(int part) {
+        return locParts.get(part);
     }
 
     /** {@inheritDoc} */
@@ -882,7 +894,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 map.put(i, part.state());
             }
 
-            return new GridDhtPartitionMap(cctx.nodeId(),
+            return new GridDhtPartitionMap(ctx.localNodeId(),
                 updateSeq.get(),
                 topVer,
                 Collections.unmodifiableMap(map),
@@ -922,7 +934,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
-        AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+        AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 
@@ -945,8 +957,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer +
                 ", topVer2=" + this.topVer +
-                ", node=" + cctx.igniteInstanceName() +
-                ", cache=" + cctx.name() +
+                ", node=" + ctx.igniteInstanceName() +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", node2part=" + node2part + ']';
 
             List<ClusterNode> nodes = null;
@@ -958,7 +970,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     HashSet<UUID> affIds = affAssignment.getIds(p);
 
                     if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) {
-                        ClusterNode n = cctx.discovery().node(nodeId);
+                        ClusterNode n = ctx.discovery().node(nodeId);
 
                         if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
                             if (nodes == null) {
@@ -991,7 +1003,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         AffinityTopologyVersion topVer,
         GridDhtPartitionState state,
         GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null;
+        Collection<UUID> allIds = topVer.topologyVersion() > 0 ?
+            F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())) :
+            null;
 
         lock.readLock().lock();
 
@@ -999,7 +1013,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
                 ", allIds=" + allIds +
                 ", node2part=" + node2part +
-                ", cache=" + cctx.name() + ']';
+                ", grp=" + grp.cacheOrGroupName() + ']';
 
             Collection<UUID> nodeIds = part2node.get(p);
 
@@ -1016,7 +1030,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     continue;
 
                 if (hasState(p, id, state, states)) {
-                    ClusterNode n = cctx.discovery().node(id);
+                    ClusterNode n = ctx.discovery().node(id);
 
                     if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
                         nodes.add(n);
@@ -1032,7 +1046,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
-        if (!cctx.rebalanceEnabled())
+        if (!grp.rebalanceEnabled())
             return ownersAndMoving(p, topVer);
 
         return nodes(p, topVer, OWNING);
@@ -1045,7 +1059,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> moving(int p) {
-        if (!cctx.rebalanceEnabled())
+        if (!grp.rebalanceEnabled())
             return ownersAndMoving(p, AffinityTopologyVersion.NONE);
 
         return nodes(p, AffinityTopologyVersion.NONE, MOVING);
@@ -1070,12 +1084,14 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         lock.readLock().lock();
 
         try {
-            assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", cache=" + cctx.name() +
-                ", started=" + cctx.started() +
+            if (node2part == null || stopping)
+                return null;
+
+            assert node2part.valid() : "Invalid node2part [node2part=" + node2part +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", stopping=" + stopping +
-                ", locNodeId=" + cctx.localNode().id() +
-                ", locName=" + cctx.igniteInstanceName() + ']';
+                ", locNodeId=" + ctx.localNode().id() +
+                ", locName=" + ctx.igniteInstanceName() + ']';
 
             GridDhtPartitionFullMap m = node2part;
 
@@ -1157,7 +1173,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // then we keep the newer value.
                     if (newPart != null &&
                         (newPart.updateSequence() < part.updateSequence() ||
-                        (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+                        (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1171,7 +1187,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
                     UUID nodeId = it.next();
 
-                    if (!cctx.discovery().alive(nodeId)) {
+                    if (!ctx.discovery().alive(nodeId)) {
                         if (log.isDebugEnabled())
                             log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" +
                                 partMap + ']');
@@ -1183,7 +1199,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             node2part = partMap;
 
-            Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f);
+            Map<Integer, Set<UUID>> p2n = new HashMap<>(grp.affinity().partitions(), 1.0f);
 
             for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
                 for (Integer p : e.getValue().keySet()) {
@@ -1202,11 +1218,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             boolean changed = false;
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
-            GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId());
+            GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
 
-            if (nodeMap != null && cctx.shared().database().persistenceEnabled()) {
+            if (nodeMap != null && ctx.database().persistenceEnabled()) {
                 for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) {
                     int p = e.getKey();
                     GridDhtPartitionState state = e.getValue();
@@ -1233,7 +1249,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             }
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+                List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
                 changed |= checkEvictions(updateSeq, aff);
 
@@ -1246,7 +1262,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 log.debug("Partition map after full update: " + fullMapString());
 
             if (changed)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
             return changed ? localPartitionMap() : null;
         }
@@ -1299,7 +1315,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
-        if (!cctx.discovery().alive(parts.nodeId())) {
+        if (!ctx.discovery().alive(parts.nodeId())) {
             if (log.isDebugEnabled())
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
@@ -1373,10 +1389,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+                List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
                 changed |= checkEvictions(updateSeq, aff);
 
@@ -1389,7 +1405,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 log.debug("Partition map after single update: " + fullMapString());
 
             if (changed)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
             return changed ? localPartitionMap() : null;
         }
@@ -1403,7 +1419,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         lock.writeLock().lock();
 
         try {
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
 
             Collection<Integer> lost = null;
 
@@ -1437,7 +1453,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             boolean changed = false;
 
             if (lost != null) {
-                PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+                PartitionLossPolicy plc = grp.config().getPartitionLossPolicy();
 
                 assert plc != null;
 
@@ -1469,13 +1485,17 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         }
                     }
 
-                    if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
-                        cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
-                            discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+                    if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        grp.addRebalanceEvent(part,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                            discoEvt.eventNode(),
+                            discoEvt.type(),
+                            discoEvt.timestamp());
+                    }
                 }
 
                 if (plc != PartitionLossPolicy.IGNORE)
-                    cctx.needsRecovery(true);
+                    grp.needsRecovery(true);
             }
 
             return changed;
@@ -1490,7 +1510,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         lock.writeLock().lock();
 
         try {
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
             long updSeq = updateSeq.incrementAndGet();
 
             for (int part = 0; part < parts; part++) {
@@ -1529,9 +1549,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            checkEvictions(updSeq, cctx.affinity().assignments(topVer));
+            checkEvictions(updSeq, grp.affinity().assignments(topVer));
 
-            cctx.needsRecovery(false);
+            grp.needsRecovery(false);
         }
         finally {
             lock.writeLock().unlock();
@@ -1545,7 +1565,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             Collection<Integer> res = null;
 
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
 
             for (int part = 0; part < parts; part++) {
                 Set<UUID> nodeIds = part2node.get(part);
@@ -1582,7 +1602,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             GridDhtLocalPartition locPart = locParts.get(p);
 
             if (locPart != null) {
-                if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId()))
+                if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId()))
                     locPart.moving();
             }
 
@@ -1607,12 +1627,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return {@code True} if state changed.
      */
     private boolean checkEvictions(long updateSeq) {
-        AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
         boolean changed = false;
 
         if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-            List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+            List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
             changed = checkEvictions(updateSeq, aff);
 
@@ -1644,12 +1664,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return Checks if any of the local partitions need to be evicted.
      */
     private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
-        if (!cctx.kernalContext().state().active())
+        if (!ctx.kernalContext().state().active())
             return false;
 
         boolean changed = false;
 
-        UUID locId = cctx.nodeId();
+        UUID locId = ctx.localNodeId();
 
         for (int p = 0; p < locParts.length(); p++) {
             GridDhtLocalPartition part = locParts.get(p);
@@ -1662,7 +1682,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (state.active()) {
                 List<ClusterNode> affNodes = aff.get(p);
 
-                if (!affNodes.contains(cctx.localNode())) {
+                if (!affNodes.contains(ctx.localNode())) {
                     List<ClusterNode> nodes = nodes(p, topVer, OWNING);
                     Collection<UUID> nodeIds = F.nodeIds(nodes);
 
@@ -1725,10 +1745,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
-        assert oldest != null || cctx.kernalContext().clientNode();
+        assert oldest != null || ctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
-        if (cctx.localNode().equals(oldest)) {
+        if (ctx.localNode().equals(oldest)) {
             long seq = node2part.updateSequence();
 
             if (seq != updateSeq) {
@@ -1755,7 +1775,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         }
 
         if (node2part != null) {
-            UUID locNodeId = cctx.localNodeId();
+            UUID locNodeId = ctx.localNodeId();
 
         GridDhtPartitionMap map = node2part.get(locNodeId);
 
@@ -1792,9 +1812,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         ClusterNode oldest = discoCache.oldestAliveServerNode();
 
-        assert oldest != null || cctx.kernalContext().clientNode();
+        assert oldest != null || ctx.kernalContext().clientNode();
 
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
         if (node2part != null) {
             if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) {
@@ -1939,11 +1959,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", cache=" + cctx.name() +
-                ", started=" + cctx.started() +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", stopping=" + stopping +
-                ", locNodeId=" + cctx.localNode().id() +
-                ", locName=" + cctx.igniteInstanceName() + ']';
+                ", locNodeId=" + ctx.localNodeId() +
+                ", locName=" + ctx.igniteInstanceName() + ']';
 
             for (GridDhtPartitionMap map : node2part.values()) {
                 if (map.hasMovingPartitions())
@@ -1957,10 +1976,25 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         }
     }
 
+    /**
+     * @param cacheId Cache ID.
+     */
+    public void onCacheStopped(int cacheId) {
+        if (!grp.sharedGroup())
+            return;
+
+        for (int i = 0; i < locParts.length(); i++) {
+            GridDhtLocalPartition part = locParts.get(i);
+
+            if (part != null)
+                part.onCacheStopped(cacheId);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
-        X.println(">>>  Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
-            ", cache=" + cctx.name() + ']');
+        X.println(">>>  Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() +
+            ", grp=" + grp.cacheOrGroupName() + ']');
 
         lock.readLock().lock();
 
@@ -1971,7 +2005,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
-                int size = part.dataStore().size();
+                int size = part.dataStore().fullSize();
 
                 if (size >= threshold)
                     X.println(">>>   Local partition [part=" + part.id() + ", size=" + size + ']');
@@ -1988,7 +2022,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return {@code True} if given partition belongs to local node.
      */
     private boolean localNode(int part, List<List<ClusterNode>> aff) {
-        return aff.get(part).contains(cctx.localNode());
+        return aff.get(part).contains(ctx.localNode());
     }
 
     /**
@@ -1999,7 +2033,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (node2part == null || !node2part.valid())
                 return;
 
-            for (int i = 0; i < cctx.affinity().partitions(); i++) {
+            for (int i = 0; i < grp.affinity().partitions(); i++) {
                 List<ClusterNode> affNodes = aff.get(i);
 
                 // Topology doesn't contain server nodes (just clients).
@@ -2015,7 +2049,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             rebalancedTopVer = topVer;
 
             if (log.isDebugEnabled())
-                log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+                log.debug("Updated rebalanced version [cache=" + grp.cacheOrGroupName() + ", ver=" + rebalancedTopVer + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index c91eb7a..d607ff1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -47,7 +47,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
@@ -118,51 +119,61 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        preldr = new GridDhtPreloader(ctx);
-
-        preldr.start();
-
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
             @Override public void apply(UUID nodeId, GridNearGetRequest req) {
                 processNearGetRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
             @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
                 processNearSingleGetRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() {
             @Override public void apply(UUID nodeId, GridNearLockRequest req) {
                 processNearLockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() {
             @Override public void apply(UUID nodeId, GridDhtLockRequest req) {
                 processDhtLockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() {
             @Override public void apply(UUID nodeId, GridDhtLockResponse req) {
                 processDhtLockResponse(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() {
             @Override public void apply(UUID nodeId, GridNearUnlockRequest req) {
                 processNearUnlockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() {
             @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) {
                 processDhtUnlockRequest(nodeId, req);
             }
         });
+
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class,
+            new MessageHandler<GridDhtForceKeysRequest>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
+                    processForceKeysRequest(node, msg);
+                }
+            });
+
+        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class,
+            new MessageHandler<GridDhtForceKeysResponse>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
+                    processForceKeyResponse(node, msg);
+                }
+            });
     }
 
     /** {@inheritDoc} */
@@ -382,7 +393,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         }
 
         IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
-            ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion());
+            ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion());
 
         if (keyFut == null || keyFut.isDone()) {
             if (keyFut != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index d777a22..6d717eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -173,19 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
         }
 
         switch (writer.state()) {
-            case 7:
+            case 6:
                 if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeMessage("retVal", retVal))
                     return false;
 
@@ -207,7 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
             return false;
 
         switch (reader.state()) {
-            case 7:
+            case 6:
                 checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
 
                 if (!reader.isLastRead())
@@ -215,7 +215,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
@@ -223,7 +223,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 retVal = reader.readMessage("retVal");
 
                 if (!reader.isLastRead())
@@ -243,7 +243,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
index c483408..67eacd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
@@ -47,6 +47,16 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
     /**
      *
      * @param vers Near Tx xid Versions.
@@ -87,7 +97,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
         }
 
         switch (writer.state()) {
-            case 3:
+            case 2:
                 if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG))
                     return false;
 
@@ -109,7 +119,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 2:
                 vers = reader.readCollection("vers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -129,6 +139,6 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 3;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 8a674fb..75f8366 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1086,7 +1086,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
 
             Collection<KeyCacheObject> keys = entry.getValue();
 
-            lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+            GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+            lastForceFut = ctx.group().preloader().request(ctx, keys, tx.topologyVersion());
 
             if (compFut != null && lastForceFut != null)
                 compFut.add(lastForceFut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 362432c..5543cec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -458,7 +458,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 GridCacheVersion ver = null;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(key);
+                    CacheDataRow row = cctx.offheap().read(cctx, key);
 
                     if (row != null) {
                         long expireTime = row.expireTime();

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 63ed9a8..3f612f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -359,7 +359,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 boolean skipEntry = readNoEntry;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(key);
+                    CacheDataRow row = cctx.offheap().read(cctx, key);
 
                     if (row != null) {
                         long expireTime = row.expireTime();
@@ -494,13 +494,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 if (skipVals)
                     setSkipValueResult(true, verVal.version());
                 else
-                    setResult(verVal.value() , verVal.version());
+                    setResult(verVal.value(), verVal.version());
             }
             else {
                 if (skipVals)
                     setSkipValueResult(false, null);
                 else
-                    setResult(null , null);
+                    setResult(null, null);
             }
         }
         else {


Mime
View raw message