ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [15/43] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches
Date Thu, 08 Jun 2017 12:33:08 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
new file mode 100644
index 0000000..4844a55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -0,0 +1,943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.database.MemoryPolicy;
+import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
+import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+
+/**
+ *
+ */
+public class CacheGroupContext {
+    /**
+     * Unique group ID. Currently for shared group it is generated as group name hash,
+     * for non-shared as cache name hash (see {@link ClusterCachesInfo#checkCacheConflict}).
+     */
+    private final int grpId;
+
+    /** Node ID cache group was received from. */
+    private final UUID rcvdFrom;
+
+    /** Flag indicating that this cache group is in a recovery mode due to partitions loss. */
+    private boolean needsRecovery;
+
+    /** */
+    private final AffinityTopologyVersion locStartVer;
+
+    /** */
+    private final CacheConfiguration<?, ?> ccfg;
+
+    /** */
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    private final boolean affNode;
+
+    /** */
+    private final CacheType cacheType;
+
+    /** */
+    private final byte ioPlc;
+
+    /** */
+    private final boolean depEnabled;
+
+    /** */
+    private final boolean storeCacheId;
+
+    /** */
+    private volatile List<GridCacheContext> caches;
+
+    /** */
+    private volatile List<GridCacheContext> contQryCaches;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private GridAffinityAssignmentCache aff;
+
+    /** */
+    private GridDhtPartitionTopologyImpl top;
+
+    /** */
+    private IgniteCacheOffheapManager offheapMgr;
+
+    /** */
+    private GridCachePreloader preldr;
+
+    /** */
+    private final MemoryPolicy memPlc;
+
+    /** */
+    private final CacheObjectContext cacheObjCtx;
+
+    /** */
+    private final FreeList freeList;
+
+    /** */
+    private final ReuseList reuseList;
+
+    /** */
+    private boolean drEnabled;
+
+    /** */
+    private boolean qryEnabled;
+
+    /**
+     * @param grpId Group ID.
+     * @param ctx Context.
+     * @param rcvdFrom Node ID cache group was received from.
+     * @param cacheType Cache type.
+     * @param ccfg Cache configuration.
+     * @param affNode Affinity node flag.
+     * @param memPlc Memory policy.
+     * @param cacheObjCtx Cache object context.
+     * @param freeList Free list.
+     * @param reuseList Reuse list.
+     * @param locStartVer Topology version when group was started on local node.
+     */
+    CacheGroupContext(
+        GridCacheSharedContext ctx,
+        int grpId,
+        UUID rcvdFrom,
+        CacheType cacheType,
+        CacheConfiguration ccfg,
+        boolean affNode,
+        MemoryPolicy memPlc,
+        CacheObjectContext cacheObjCtx,
+        FreeList freeList,
+        ReuseList reuseList,
+        AffinityTopologyVersion locStartVer) {
+        assert ccfg != null;
+        assert memPlc != null || !affNode;
+        assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']';
+
+        this.grpId = grpId;
+        this.rcvdFrom = rcvdFrom;
+        this.ctx = ctx;
+        this.ccfg = ccfg;
+        this.affNode = affNode;
+        this.memPlc = memPlc;
+        this.cacheObjCtx = cacheObjCtx;
+        this.freeList = freeList;
+        this.reuseList = reuseList;
+        this.locStartVer = locStartVer;
+        this.cacheType = cacheType;
+
+        ioPlc = cacheType.ioPolicy();
+
+        depEnabled = ctx.kernalContext().deploy().enabled() && !ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg);
+
+        storeCacheId = affNode && memPlc.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED;
+
+        log = ctx.kernalContext().log(getClass());
+
+        caches = new ArrayList<>();
+    }
+
+    /**
+     * @return {@code True} if this is cache group for one of system caches.
+     */
+    public boolean systemCache() {
+        return !sharedGroup() && CU.isSystemCache(ccfg.getName());
+    }
+
+    /**
+     * @return Node ID initiated cache group start.
+     */
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
+    /**
+     * @return {@code True} if cacheId should be stored in data pages.
+     */
+    public boolean storeCacheIdInDataPage() {
+        return storeCacheId;
+    }
+
+    /**
+     * @return {@code True} if deployment is enabled.
+     */
+    public boolean deploymentEnabled() {
+        return depEnabled;
+    }
+
+    /**
+     * @return Preloader.
+     */
+    public GridCachePreloader preloader() {
+        return preldr;
+    }
+
+    /**
+     * @return IO policy for the given cache group.
+     */
+    public byte ioPolicy() {
+        return ioPlc;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
+        addCacheContext(cctx);
+
+        offheapMgr.onCacheStarted(cctx);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return {@code True} if group contains cache with given name.
+     */
+    public boolean hasCache(String cacheName) {
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            if (caches.get(i).name().equals(cacheName))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    private void addCacheContext(GridCacheContext cctx) {
+        assert cacheType.userCache() == cctx.userCache() : cctx.name();
+        assert grpId == cctx.groupId() : cctx.name();
+
+        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
+
+        assert sharedGroup() || caches.isEmpty();
+
+        boolean add = caches.add(cctx);
+
+        assert add : cctx.name();
+
+        if (!qryEnabled && QueryUtils.isEnabled(cctx.config()))
+            qryEnabled = true;
+
+        if (!drEnabled && cctx.isDrEnabled())
+            drEnabled = true;
+
+        this.caches = caches;
+   }
+
+    /**
+     * @param cctx Cache context.
+     */
+    private void removeCacheContext(GridCacheContext cctx) {
+        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
+
+        // It is possible cache was not added in case of errors on cache start.
+        for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) {
+            GridCacheContext next = it.next();
+
+            if (next == cctx) {
+                assert sharedGroup() || caches.size() == 1 : caches.size();
+
+                it.remove();
+
+                break;
+            }
+        }
+
+        if (QueryUtils.isEnabled(cctx.config())) {
+            boolean qryEnabled = false;
+
+            for (int i = 0; i < caches.size(); i++) {
+                if (QueryUtils.isEnabled(caches.get(i).config())) {
+                    qryEnabled = true;
+
+                    break;
+                }
+            }
+
+            this.qryEnabled = qryEnabled;
+        }
+
+        if (cctx.isDrEnabled()) {
+            boolean drEnabled = false;
+
+            for (int i = 0; i < caches.size(); i++) {
+                if (caches.get(i).isDrEnabled()) {
+                    drEnabled = true;
+
+                    break;
+                }
+            }
+
+            this.drEnabled = drEnabled;
+        }
+
+        this.caches = caches;
+    }
+
+    /**
+     * @return Cache context if group contains single cache.
+     */
+    public GridCacheContext singleCacheContext() {
+        List<GridCacheContext> caches = this.caches;
+
+        assert !sharedGroup() && caches.size() == 1;
+
+        return caches.get(0);
+    }
+
+    /**
+     *
+     */
+    public void unwindUndeploys() {
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.deploy().unwind(cctx);
+        }
+    }
+
+    /**
+     * @param type Event type to check.
+     * @return {@code True} if given event type should be recorded.
+     */
+    public boolean eventRecordable(int type) {
+        return cacheType.userCache() && ctx.gridEvents().isRecordable(type);
+    }
+
+    /**
+     * Adds rebalancing event.
+     *
+     * @param part Partition.
+     * @param type Event type.
+     * @param discoNode Discovery node.
+     * @param discoType Discovery event type.
+     * @param discoTs Discovery event timestamp.
+     */
+    public void addRebalanceEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) {
+        assert discoNode != null;
+        assert type > 0;
+        assert discoType > 0;
+        assert discoTs > 0;
+
+        if (!eventRecordable(type))
+            LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type));
+
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            if (cctx.recordEvent(type)) {
+                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                    cctx.localNode(),
+                    "Cache rebalancing event.",
+                    type,
+                    part,
+                    discoNode,
+                    discoType,
+                    discoTs));
+            }
+        }
+    }
+    /**
+     * Adds partition unload event.
+     *
+     * @param part Partition.
+     */
+    public void addUnloadEvent(int part) {
+        if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+            LT.warn(log, "Added event without checking if event is recordable: " +
+                U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));
+
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                cctx.localNode(),
+                "Cache unloading event.",
+                EVT_CACHE_REBALANCE_PART_UNLOADED,
+                part,
+                null,
+                0,
+                0));
+        }
+    }
+
+    /**
+     * @param part Partition.
+     * @param key Key.
+     * @param evtNodeId Event node ID.
+     * @param type Event type.
+     * @param newVal New value.
+     * @param hasNewVal Has new value flag.
+     * @param oldVal Old values.
+     * @param hasOldVal Has old value flag.
+     * @param keepBinary Keep binary flag.
+     */
+    public void addCacheEvent(
+        int part,
+        KeyCacheObject key,
+        UUID evtNodeId,
+        int type,
+        @Nullable CacheObject newVal,
+        boolean hasNewVal,
+        @Nullable CacheObject oldVal,
+        boolean hasOldVal,
+        boolean keepBinary
+    ) {
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.events().addEvent(part,
+                key,
+                evtNodeId,
+                (IgniteUuid)null,
+                null,
+                type,
+                newVal,
+                hasNewVal,
+                oldVal,
+                hasOldVal,
+                null,
+                null,
+                null,
+                keepBinary);
+        }
+    }
+
+    /**
+     * @return {@code True} if contains cache with query indexing enabled.
+     */
+    public boolean queriesEnabled() {
+        return qryEnabled;
+    }
+
+    /**
+     * @return {@code True} if fast eviction is allowed.
+     */
+    public boolean allowFastEviction() {
+        return ctx.database().persistenceEnabled() && !queriesEnabled();
+    }
+
+    /**
+     * @return {@code True} in case replication is enabled.
+     */
+    public boolean isDrEnabled() {
+        return drEnabled;
+    }
+
+    /**
+     * @return Free List.
+     */
+    public FreeList freeList() {
+        return freeList;
+    }
+
+    /**
+     * @return Reuse List.
+     */
+    public ReuseList reuseList() {
+        return reuseList;
+    }
+
+    /**
+     * @return Cache object context.
+     */
+    public CacheObjectContext cacheObjectContext() {
+        return cacheObjCtx;
+    }
+
+    /**
+     * @return Cache shared context.
+     */
+    public GridCacheSharedContext shared() {
+        return ctx;
+    }
+
+    /**
+     * @return Memory policy.
+     */
+    public MemoryPolicy memoryPolicy() {
+        return memPlc;
+    }
+
+    /**
+     * @return {@code True} if local node is affinity node.
+     */
+    public boolean affinityNode() {
+        return affNode;
+    }
+
+    /**
+     * @return Topology.
+     */
+    public GridDhtPartitionTopology topology() {
+        if (top == null)
+            throw new IllegalStateException("Topology is not initialized: " + name());
+
+        return top;
+    }
+
+    /**
+     * @return Offheap manager.
+     */
+    public IgniteCacheOffheapManager offheap() {
+        return offheapMgr;
+    }
+
+    /**
+     * @return Current cache state. Must only be modified during exchange.
+     */
+    public boolean needsRecovery() {
+        return needsRecovery;
+    }
+
+    /**
+     * @param needsRecovery Needs recovery flag.
+     */
+    public void needsRecovery(boolean needsRecovery) {
+        this.needsRecovery = needsRecovery;
+    }
+
+    /**
+     * @return Topology version when group was started on local node.
+     */
+    public AffinityTopologyVersion localStartVersion() {
+        return locStartVer;
+    }
+
+    /**
+     * @return {@code True} if cache is local.
+     */
+    public boolean isLocal() {
+        return ccfg.getCacheMode() == LOCAL;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    public CacheConfiguration config() {
+        return ccfg;
+    }
+
+    /**
+     * @return Cache node filter.
+     */
+    public IgnitePredicate<ClusterNode> nodeFilter() {
+        return ccfg.getNodeFilter();
+    }
+
+    /**
+     * @return Configured user objects which should be initialized/stopped on group start/stop.
+     */
+    Collection<?> configuredUserObjects() {
+        return Arrays.asList(ccfg.getAffinity(), ccfg.getNodeFilter(), ccfg.getTopologyValidator());
+    }
+
+    /**
+     * @return Configured topology validator.
+     */
+    @Nullable public TopologyValidator topologyValidator() {
+        return ccfg.getTopologyValidator();
+    }
+
+    /**
+     * @return Configured affinity function.
+     */
+    public AffinityFunction affinityFunction() {
+        return ccfg.getAffinity();
+    }
+
+    /**
+     * @return Affinity.
+     */
+    public GridAffinityAssignmentCache affinity() {
+        return aff;
+    }
+
+    /**
+     * @return Group name or {@code null} if group name was not specified for cache.
+     */
+    @Nullable public String name() {
+        return ccfg.getGroupName();
+    }
+
+    /**
+     * @return Group name if it is specified, otherwise cache name.
+     */
+    public String cacheOrGroupName() {
+        return ccfg.getGroupName() != null ? ccfg.getGroupName() : ccfg.getName();
+    }
+
+    /**
+     * @return Group ID.
+     */
+    public int groupId() {
+        return grpId;
+    }
+
+    /**
+     * @return {@code True} if group can contain multiple caches.
+     */
+    public boolean sharedGroup() {
+        return ccfg.getGroupName() != null;
+    }
+
+    /**
+     *
+     */
+    public void onKernalStop() {
+        aff.cancelFutures(new IgniteCheckedException("Failed to wait for topology update, node is stopping."));
+
+        preldr.onKernalStop();
+
+        offheapMgr.onKernalStop();
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param destroy Destroy flag.
+     */
+    void stopCache(GridCacheContext cctx, boolean destroy) {
+        if (top != null)
+            top.onCacheStopped(cctx.cacheId());
+
+        offheapMgr.stopCache(cctx.cacheId(), destroy);
+
+        removeCacheContext(cctx);
+    }
+
+    /**
+     *
+     */
+    void stopGroup() {
+        IgniteCheckedException err =
+            new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
+
+        aff.cancelFutures(err);
+
+        offheapMgr.stop();
+
+        ctx.io().removeCacheGroupHandlers(grpId);
+    }
+
+    /**
+     * @return IDs of caches in this group.
+     */
+    public Set<Integer> cacheIds() {
+        List<GridCacheContext> caches = this.caches;
+
+        Set<Integer> ids = U.newHashSet(caches.size());
+
+        for (int i = 0; i < caches.size(); i++)
+            ids.add(caches.get(i).cacheId());
+
+        return ids;
+    }
+
+    /**
+     * @return {@code True} if group contains caches.
+     */
+    boolean hasCaches() {
+        List<GridCacheContext> caches = this.caches;
+
+        return !caches.isEmpty();
+    }
+
+    /**
+     * @param part Partition ID.
+     */
+    public void onPartitionEvicted(int part) {
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            if (cctx.isDrEnabled())
+                cctx.dr().partitionEvicted(part);
+
+            cctx.continuousQueries().onPartitionEvicted(part);
+
+            cctx.dataStructures().onPartitionEvicted(part);
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    public void addCacheWithContinuousQuery(GridCacheContext cctx) {
+        assert sharedGroup() : cacheOrGroupName();
+        assert cctx.group() == this : cctx.name();
+        assert !cctx.isLocal() : cctx.name();
+
+        synchronized (this) {
+            List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+            if (contQryCaches == null)
+                contQryCaches = new ArrayList<>();
+
+            contQryCaches.add(cctx);
+
+            this.contQryCaches = contQryCaches;
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    public void removeCacheWithContinuousQuery(GridCacheContext cctx) {
+        assert sharedGroup() : cacheOrGroupName();
+        assert cctx.group() == this : cctx.name();
+        assert !cctx.isLocal() : cctx.name();
+
+        synchronized (this) {
+            List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+            if (contQryCaches == null)
+                return;
+
+            contQryCaches.remove(cctx);
+
+            if (contQryCaches.isEmpty())
+                contQryCaches = null;
+
+            this.contQryCaches = contQryCaches;
+        }
+    }
+
+    /**
+     * @param cacheId ID of cache initiated counter update.
+     * @param part Partition number.
+     * @param cntr Counter.
+     * @param topVer Topology version for current operation.
+     */
+    public void onPartitionCounterUpdate(int cacheId,
+        int part,
+        long cntr,
+        AffinityTopologyVersion topVer,
+        boolean primary) {
+        assert sharedGroup();
+
+        if (isLocal())
+            return;
+
+        List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+        if (contQryCaches == null)
+            return;
+
+        CounterSkipContext skipCtx = null;
+
+        for (int i = 0; i < contQryCaches.size(); i++) {
+            GridCacheContext cctx = contQryCaches.get(i);
+
+            if (cacheId != cctx.cacheId())
+                skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer, primary);
+        }
+
+        final List<Runnable> procC = skipCtx != null ? skipCtx.processClosures() : null;
+
+        if (procC != null) {
+            ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    for (Runnable c : procC)
+                        c.run();
+                }
+            });
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void start() throws IgniteCheckedException {
+        aff = new GridAffinityAssignmentCache(ctx.kernalContext(),
+            cacheOrGroupName(),
+            grpId,
+            ccfg.getAffinity(),
+            ccfg.getNodeFilter(),
+            ccfg.getBackups(),
+            ccfg.getCacheMode() == LOCAL);
+
+        if (ccfg.getCacheMode() != LOCAL) {
+            top = new GridDhtPartitionTopologyImpl(ctx, this);
+
+            if (!ctx.kernalContext().clientNode()) {
+                ctx.io().addCacheGroupHandler(groupId(), GridDhtAffinityAssignmentRequest.class,
+                    new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>() {
+                        @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) {
+                            processAffinityAssignmentRequest(nodeId, msg);
+                        }
+                    });
+            }
+
+            preldr = new GridDhtPreloader(this);
+
+            preldr.start();
+        }
+        else
+            preldr = new GridCachePreloaderAdapter(this);
+
+        offheapMgr = new IgniteCacheOffheapManagerImpl();
+
+        offheapMgr.start(ctx, this);
+
+        ctx.affinity().onCacheGroupCreated(this);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void processAffinityAssignmentRequest(final UUID nodeId,
+        final GridDhtAffinityAssignmentRequest req) {
+        if (log.isDebugEnabled())
+            log.debug("Processing affinity assignment request [node=" + nodeId + ", req=" + req + ']');
+
+        IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(req.topologyVersion());
+
+        if (fut != null) {
+            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    processAffinityAssignmentRequest0(nodeId, req);
+                }
+            });
+        }
+        else
+            processAffinityAssignmentRequest0(nodeId, req);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void processAffinityAssignmentRequest0(UUID nodeId, final GridDhtAffinityAssignmentRequest req) {
+        AffinityTopologyVersion topVer = req.topologyVersion();
+
+        if (log.isDebugEnabled())
+            log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
+                ", node=" + nodeId + ']');
+
+        AffinityAssignment assignment = aff.cachedAffinity(topVer);
+
+        GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+            req.futureId(),
+            grpId,
+            topVer,
+            assignment.assignment());
+
+        if (aff.centralizedAffinityFunction()) {
+            assert assignment.idealAssignment() != null;
+
+            res.idealAffinityAssignment(assignment.idealAssignment());
+        }
+
+        try {
+            ctx.io().send(nodeId, res, AFFINITY_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send affinity assignment response to remote node [node=" + nodeId + ']', e);
+        }
+    }
+
+    /**
+     * @param reconnectFut Reconnect future.
+     */
+    public void onDisconnected(IgniteFuture reconnectFut) {
+        IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+            "Failed to wait for topology update, client disconnected.");
+
+        if (aff != null)
+            aff.cancelFutures(err);
+    }
+
+    /**
+     * @return {@code True} if rebalance is enabled.
+     */
+    public boolean rebalanceEnabled() {
+        return ccfg.getRebalanceMode() != NONE;
+    }
+
+    /**
+     *
+     */
+    public void onReconnected() {
+        aff.onReconnected();
+
+        if (top != null)
+            top.onReconnected();
+
+        preldr.onReconnected();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "CacheGroupContext [grp=" + cacheOrGroupName() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
new file mode 100644
index 0000000..a290caf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CacheGroupData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final int grpId;
+
+    /** */
+    private final String grpName;
+
+    /** */
+    private final AffinityTopologyVersion startTopVer;
+
+    /** */
+    private final UUID rcvdFrom;
+
+    /** */
+    private final IgniteUuid deploymentId;
+
+    /** */
+    private final CacheConfiguration<?, ?> cacheCfg;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, Integer> caches;
+
+    /** */
+    private long flags;
+
+    /**
+     * @param cacheCfg Cache configuration.
+     * @param grpName Group name.
+     * @param grpId  Group ID.
+     * @param rcvdFrom Node ID cache group received from.
+     * @param startTopVer Start version for dynamically started group.
+     * @param deploymentId Deployment ID.
+     * @param caches Cache group caches.
+     */
+    CacheGroupData(
+        CacheConfiguration cacheCfg,
+        @Nullable String grpName,
+        int grpId,
+        UUID rcvdFrom,
+        @Nullable AffinityTopologyVersion startTopVer,
+        IgniteUuid deploymentId,
+        Map<String, Integer> caches,
+        long flags) {
+        assert cacheCfg != null;
+        assert grpId != 0;
+        assert deploymentId != null;
+
+        this.cacheCfg = cacheCfg;
+        this.grpName = grpName;
+        this.grpId = grpId;
+        this.rcvdFrom = rcvdFrom;
+        this.startTopVer = startTopVer;
+        this.deploymentId = deploymentId;
+        this.caches = caches;
+        this.flags = flags;
+    }
+
+    /**
+     * @return Start version for dynamically started group.
+     */
+    @Nullable public AffinityTopologyVersion startTopologyVersion() {
+        return startTopVer;
+    }
+
+    /**
+     * @return Node ID group was received from.
+     */
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
+    /**
+     * @return Group name.
+     */
+    @Nullable public String groupName() {
+        return grpName;
+    }
+
+    /**
+     * @return Group ID.
+     */
+    public int groupId() {
+        return grpId;
+    }
+
+    /**
+     * @return Deployment ID.
+     */
+    public IgniteUuid deploymentId() {
+        return deploymentId;
+    }
+
+    /**
+     * @return Configuration.
+     */
+    public CacheConfiguration<?, ?> config() {
+        return cacheCfg;
+    }
+
+    /**
+     * @return Group caches.
+     */
+    Map<String, Integer> caches() {
+        return caches;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheGroupData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
new file mode 100644
index 0000000..c4976e5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CacheGroupDescriptor {
+    /** */
+    private final int grpId;
+
+    /** */
+    private final String grpName;
+
+    /** */
+    private final AffinityTopologyVersion startTopVer;
+
+    /** */
+    private final UUID rcvdFrom;
+
+    /** */
+    private final IgniteUuid deploymentId;
+
+    /** */
+    @GridToStringExclude
+    private final CacheConfiguration<?, ?> cacheCfg;
+
+    /** */
+    @GridToStringInclude
+    private Map<String, Integer> caches;
+
+    /** */
+    private AffinityTopologyVersion rcvdFromVer;
+
+    /**
+     * @param cacheCfg Cache configuration.
+     * @param grpName Group name.
+     * @param grpId  Group ID.
+     * @param rcvdFrom Node ID cache group received from.
+     * @param startTopVer Start version for dynamically started group.
+     * @param deploymentId Deployment ID.
+     * @param caches Cache group caches.
+     */
+    CacheGroupDescriptor(
+        CacheConfiguration cacheCfg,
+        @Nullable String grpName,
+        int grpId,
+        UUID rcvdFrom,
+        @Nullable AffinityTopologyVersion startTopVer,
+        IgniteUuid deploymentId,
+        Map<String, Integer> caches) {
+        assert cacheCfg != null;
+        assert grpId != 0;
+
+        this.grpName = grpName;
+        this.grpId = grpId;
+        this.rcvdFrom = rcvdFrom;
+        this.startTopVer = startTopVer;
+        this.deploymentId = deploymentId;
+        this.cacheCfg = cacheCfg;
+        this.caches = caches;
+    }
+
+    /**
+     * @return Node ID group was received from.
+     */
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
+    /**
+     * @return Deployment ID.
+     */
+    public IgniteUuid deploymentId() {
+        return deploymentId;
+    }
+
+    /**
+     * @param cacheName Cache name
+     * @param cacheId Cache ID.
+     */
+    void onCacheAdded(String cacheName, int cacheId) {
+        assert cacheName != null;
+        assert cacheId != 0 : cacheName;
+
+        Map<String, Integer> caches = new HashMap<>(this.caches);
+
+        caches.put(cacheName, cacheId);
+
+        this.caches = caches;
+    }
+
+    /**
+     * @param cacheName Cache name
+     * @param cacheId Cache ID.
+     */
+    void onCacheStopped(String cacheName, int cacheId) {
+        assert cacheName != null;
+        assert cacheId != 0;
+
+        Map<String, Integer> caches = new HashMap<>(this.caches);
+
+        Integer rmvd = caches.remove(cacheName);
+
+        assert rmvd != null && rmvd == cacheId : cacheName;
+
+        this.caches = caches;
+    }
+
+    /**
+     * @return {@code True} if group contains cache.
+     */
+    boolean hasCaches() {
+        return caches != null && !caches.isEmpty();
+    }
+
+    /**
+     * @return {@code True} if group can contain multiple caches.
+     */
+    public boolean sharedGroup() {
+        return grpName != null;
+    }
+
+    /**
+     * @return Group name if it is specified, otherwise cache name.
+     */
+    public String cacheOrGroupName() {
+        return grpName != null ? grpName : cacheCfg.getName();
+    }
+
+    /**
+     * @return Group name or {@code null} if group name was not specified for cache.
+     */
+    @Nullable public String groupName() {
+        return grpName;
+    }
+
+    /**
+     * @return Group ID.
+     */
+    public int groupId() {
+        return grpId;
+    }
+
+    /**
+     * @return Configuration.
+     */
+    public CacheConfiguration<?, ?> config() {
+        return cacheCfg;
+    }
+
+    /**
+     * @return Group caches.
+     */
+    public Map<String, Integer> caches() {
+        return caches;
+    }
+
+    /**
+     * @return Topology version when node provided cache configuration was started.
+     */
+    @Nullable AffinityTopologyVersion receivedFromStartVersion() {
+        return rcvdFromVer;
+    }
+
+    /**
+     * @param rcvdFromVer Topology version when node provided cache configuration was started.
+     */
+    void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
+        this.rcvdFromVer = rcvdFromVer;
+    }
+
+    /**
+     * @return Start version for dynamically started group.
+     */
+    @Nullable public AffinityTopologyVersion startTopologyVersion() {
+        return startTopVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheGroupDescriptor.class, this, "cacheName", cacheCfg.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
index afc01c9..58c9d82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -111,7 +111,7 @@ class CacheJoinNodeDiscoveryData implements Serializable {
         private final boolean sql;
 
         /** Flags added for future usage. */
-        private final byte flags;
+        private final long flags;
 
         /**
          * @param ccfg Cache configuration.
@@ -119,7 +119,7 @@ class CacheJoinNodeDiscoveryData implements Serializable {
          * @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}.
          * @param flags Flags (for future usage).
          */
-        CacheInfo(CacheConfiguration ccfg, CacheType cacheType, boolean sql, byte flags) {
+        CacheInfo(CacheConfiguration ccfg, CacheType cacheType, boolean sql, long flags) {
             this.ccfg = ccfg;
             this.cacheType = cacheType;
             this.sql = sql;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index d4ad8e4..c9ee641 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -195,7 +195,10 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** {@inheritDoc} */
     @Override public long getOffHeapPrimaryEntriesCount() {
         try {
-            return cctx.offheap().entriesCount(true, false, cctx.affinity().affinityTopologyVersion());
+            return cctx.offheap().cacheEntriesCount(cctx.cacheId(),
+                true,
+                false,
+                cctx.affinity().affinityTopologyVersion());
         }
         catch (IgniteCheckedException ignored) {
             return 0;
@@ -205,7 +208,10 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** {@inheritDoc} */
     @Override public long getOffHeapBackupEntriesCount() {
         try {
-            return cctx.offheap().entriesCount(false, true, cctx.affinity().affinityTopologyVersion());
+            return cctx.offheap().cacheEntriesCount(cctx.cacheId(),
+                false,
+                true,
+                cctx.affinity().affinityTopologyVersion());
         }
         catch (IgniteCheckedException ignored) {
             return 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
index 84a33dc..4c70cb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -39,22 +39,41 @@ class CacheNodeCommonDiscoveryData implements Serializable {
     private final Map<String, CacheData> templates;
 
     /** */
+    @GridToStringInclude
+    private final Map<Integer, CacheGroupData> cacheGrps;
+
+    /** */
     private final Map<String, Map<UUID, Boolean>> clientNodesMap;
 
     /**
      * @param caches Started caches.
      * @param templates Configured templates.
+     * @param cacheGrps Started cache groups.
      * @param clientNodesMap Information about cache client nodes.
      */
     CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
         Map<String, CacheData> templates,
+        Map<Integer, CacheGroupData> cacheGrps,
         Map<String, Map<UUID, Boolean>> clientNodesMap) {
+        assert caches != null;
+        assert templates != null;
+        assert cacheGrps != null;
+        assert clientNodesMap != null;
+
         this.caches = caches;
         this.templates = templates;
+        this.cacheGrps = cacheGrps;
         this.clientNodesMap = clientNodesMap;
     }
 
     /**
+     * @return Started cache groups.
+     */
+    Map<Integer, CacheGroupData> cacheGroups() {
+        return cacheGrps;
+    }
+
+    /**
      * @return Started caches.
      */
     Map<String, CacheData> caches() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
index f8e9f32..d737c8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
@@ -41,15 +41,10 @@ public class CacheOffheapEvictionManager extends GridCacheManagerAdapter impleme
             return;
 
         try {
-            if (e.markObsoleteIfEmpty(null) || e.obsolete()) {
-                e.context().cache().removeEntry(e);
+            boolean evicted = e.evictInternal(GridCacheVersionManager.EVICT_VER, null, false)
+                || e.markObsoleteIfEmpty(null);
 
-                return;
-            }
-
-            boolean evicted = e.evictInternal(GridCacheVersionManager.EVICT_VER, null, false);
-
-            if (evicted)
+            if (evicted && !e.isDht()) // GridDhtCacheEntry removes entry when obsoleted.
                 cctx.cache().removeEntry(e);
         }
         catch (IgniteCheckedException ex) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index e4d2668..0fcc740 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -45,8 +45,11 @@ import org.apache.ignite.internal.processors.query.schema.SchemaOperationExcepti
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -63,6 +66,9 @@ class ClusterCachesInfo {
     /** Dynamic caches. */
     private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
 
+    /** */
+    private final ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGrps = new ConcurrentHashMap<>();
+
     /** Cache templates. */
     private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
 
@@ -70,13 +76,13 @@ class ClusterCachesInfo {
     private final IgniteLogger log;
 
     /** */
-    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+    private CachesOnDisconnect cachesOnDisconnect;
 
     /** */
     private CacheJoinNodeDiscoveryData joinDiscoData;
 
     /** */
-    private CacheNodeCommonDiscoveryData gridData;
+    private GridData gridData;
 
     /** */
     private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
@@ -95,11 +101,40 @@ class ClusterCachesInfo {
 
     /**
      * @param joinDiscoData Information about configured caches and templates.
+     * @throws IgniteCheckedException If configuration validation failed.
      */
-    void onStart(CacheJoinNodeDiscoveryData joinDiscoData) {
+    void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException {
         this.joinDiscoData = joinDiscoData;
 
-        processJoiningNode(joinDiscoData, ctx.localNodeId());
+        Map<String, CacheConfiguration> grpCfgs = new HashMap<>();
+
+        for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) {
+            if (info.config().getGroupName() == null)
+                continue;
+
+            CacheConfiguration ccfg = grpCfgs.get(info.config().getGroupName());
+
+            if (ccfg == null)
+                grpCfgs.put(info.config().getGroupName(), info.config());
+            else
+                validateCacheGroupConfiguration(ccfg, info.config());
+        }
+
+        String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true);
+
+        if (conflictErr != null)
+            throw new IgniteCheckedException("Failed to start configured cache. " + conflictErr);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param grpName Group name.
+     * @return Group ID.
+     */
+    private int cacheGroupId(String cacheName, @Nullable String grpName) {
+        assert cacheName != null;
+
+        return grpName != null ? CU.cacheId(grpName) : CU.cacheId(cacheName);
     }
 
     /**
@@ -107,14 +142,19 @@ class ClusterCachesInfo {
      * @throws IgniteCheckedException If failed.
      */
     void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
+        if (gridData != null && gridData.conflictErr != null)
+            throw new IgniteCheckedException(gridData.conflictErr);
+
         if (checkConsistency && joinDiscoData != null && gridData != null) {
             for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) {
                 CacheConfiguration locCfg = locCacheInfo.config();
 
-                CacheData cacheData = gridData.caches().get(locCfg.getName());
+                CacheData cacheData = gridData.gridData.caches().get(locCfg.getName());
 
                 if (cacheData != null)
                     checkCache(locCacheInfo, cacheData, cacheData.receivedFrom());
+
+                validateStartCacheConfiguration(locCfg);
             }
         }
 
@@ -138,6 +178,9 @@ class ClusterCachesInfo {
         CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
             locAttr.cacheMode(), rmtAttr.cacheMode(), true);
 
+        CU.checkAttributeMismatch(log, rmtAttr.groupName(), rmt, "groupName", "Cache group name",
+            locAttr.groupName(), rmtAttr.groupName(), true);
+
         CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "sql", "SQL flag",
             locAttr.sql(), rmtAttr.sql(), true);
 
@@ -151,6 +194,9 @@ class ClusterCachesInfo {
             CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
                 "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
 
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "topologyValidator",
+                "Cache topology validator", locAttr.topologyValidatorClassName(), rmtAttr.topologyValidatorClassName(), true);
+
             ClusterNode rmtNode = ctx.discovery().node(rmt);
 
             if (CU.affinityNode(ctx.discovery().localNode(), locInfo.config().getNodeFilter())
@@ -251,6 +297,7 @@ class ClusterCachesInfo {
                     DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
                         ccfg,
                         req.cacheType(),
+                        null,
                         true,
                         req.initiatingNodeId(),
                         false,
@@ -278,13 +325,24 @@ class ClusterCachesInfo {
 
             if (req.start()) {
                 if (desc == null) {
+                    String conflictErr = checkCacheConflict(req.startCacheConfiguration());
+
+                    if (conflictErr != null) {
+                        U.warn(log, "Ignore cache start request. " + conflictErr);
+
+                        ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
+                            "cache. " + conflictErr));
+
+                        continue;
+                    }
+
                     if (req.clientStartOnly()) {
                         ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
                             "client cache (a cache with the given name is not started): " + req.cacheName()));
                     }
                     else {
                         SchemaOperationException err = QueryUtils.checkQueryEntityConflicts(
-                            req.startCacheConfiguration(), ctx.cache().cacheDescriptors());
+                            req.startCacheConfiguration(), ctx.cache().cacheDescriptors().values());
 
                         if (err != null) {
                             ctx.cache().completeCacheStartFuture(req, false, err);
@@ -297,9 +355,19 @@ class ClusterCachesInfo {
                         assert req.cacheType() != null : req;
                         assert F.eq(ccfg.getName(), req.cacheName()) : req;
 
+                        int cacheId = CU.cacheId(req.cacheName());
+
+                        CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions,
+                            topVer,
+                            ccfg,
+                            cacheId,
+                            req.initiatingNodeId(),
+                            req.deploymentId());
+
                         DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
                             ccfg,
                             req.cacheType(),
+                            grpDesc,
                             false,
                             req.initiatingNodeId(),
                             false,
@@ -312,10 +380,9 @@ class ClusterCachesInfo {
                         assert old == null;
 
                         ctx.discovery().setCacheFilter(
+                            grpDesc.groupId(),
                             ccfg.getName(),
-                            ccfg.getNodeFilter(),
-                            ccfg.getNearConfiguration() != null,
-                            ccfg.getCacheMode());
+                            ccfg.getNearConfiguration() != null);
 
                         ctx.discovery().addClientNode(req.cacheName(),
                             req.initiatingNodeId(),
@@ -391,8 +458,6 @@ class ClusterCachesInfo {
                 }
             }
             else if (req.stop()) {
-                assert req.stop() ^ req.close() : req;
-
                 if (desc != null) {
                     if (req.sql() && !desc.sql()) {
                         ctx.cache().completeCacheStartFuture(req, false,
@@ -412,13 +477,27 @@ class ClusterCachesInfo {
 
                     DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
 
-                    assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
+                    assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']';
 
                     ctx.discovery().removeCacheFilter(req.cacheName());
 
                     needExchange = true;
 
                     exchangeActions.addCacheToStop(req, desc);
+
+                    CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupId());
+
+                    assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc;
+
+                    grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
+
+                    if (!grpDesc.hasCaches()) {
+                        registeredCacheGrps.remove(grpDesc.groupId());
+
+                        ctx.discovery().removeCacheGroup(grpDesc);
+
+                        exchangeActions.addCacheGroupToStop(grpDesc);
+                    }
                 }
             }
             else if (req.close()) {
@@ -498,10 +577,24 @@ class ClusterCachesInfo {
      */
     private Serializable joinDiscoveryData() {
         if (cachesOnDisconnect != null) {
+            Map<Integer, CacheClientReconnectDiscoveryData.CacheGroupInfo> cacheGrpsInfo = new HashMap<>();
             Map<String, CacheClientReconnectDiscoveryData.CacheInfo> cachesInfo = new HashMap<>();
 
+            Map<Integer, CacheGroupDescriptor> grps = cachesOnDisconnect.cacheGrps;
+            Map<String, DynamicCacheDescriptor> caches = cachesOnDisconnect.caches;
+
+            for (CacheGroupContext grp : ctx.cache().cacheGroups()) {
+                CacheGroupDescriptor desc = grps.get(grp.groupId());
+
+                assert desc != null : grp.cacheOrGroupName();
+
+                cacheGrpsInfo.put(grp.groupId(), new CacheClientReconnectDiscoveryData.CacheGroupInfo(desc.config(),
+                    desc.deploymentId(),
+                    0));
+            }
+
             for (IgniteInternalCache cache : ctx.cache().caches()) {
-                DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name());
+                DynamicCacheDescriptor desc = caches.get(cache.name());
 
                 assert desc != null : cache.name();
 
@@ -509,10 +602,10 @@ class ClusterCachesInfo {
                     desc.cacheType(),
                     desc.deploymentId(),
                     cache.context().isNear(),
-                    (byte)0));
+                    0));
             }
 
-            return new CacheClientReconnectDiscoveryData(cachesInfo);
+            return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo);
         }
         else {
             assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active();
@@ -575,6 +668,11 @@ class ClusterCachesInfo {
      */
     void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
         if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
+            for (CacheGroupDescriptor desc : registeredCacheGrps.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.receivedFromStartVersion(topVer);
+            }
+
             for (DynamicCacheDescriptor desc : registeredCaches.values()) {
                 if (node.id().equals(desc.receivedFrom()))
                     desc.receivedFromStartVersion(topVer);
@@ -610,11 +708,27 @@ class ClusterCachesInfo {
      * @return Information about started caches.
      */
     private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+        Map<Integer, CacheGroupData> cacheGrps = new HashMap<>();
+
+        for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
+            CacheGroupData grpData = new CacheGroupData(grpDesc.config(),
+                grpDesc.groupName(),
+                grpDesc.groupId(),
+                grpDesc.receivedFrom(),
+                grpDesc.startTopologyVersion(),
+                grpDesc.deploymentId(),
+                grpDesc.caches(),
+                0);
+
+            cacheGrps.put(grpDesc.groupId(), grpData);
+        }
+
         Map<String, CacheData> caches = new HashMap<>();
 
         for (DynamicCacheDescriptor desc : registeredCaches.values()) {
             CacheData cacheData = new CacheData(desc.cacheConfiguration(),
                 desc.cacheId(),
+                desc.groupId(),
                 desc.cacheType(),
                 desc.deploymentId(),
                 desc.schema(),
@@ -622,7 +736,7 @@ class ClusterCachesInfo {
                 desc.staticallyConfigured(),
                 desc.sql(),
                 false,
-                (byte)0);
+                0);
 
             caches.put(desc.cacheName(), cacheData);
         }
@@ -632,6 +746,7 @@ class ClusterCachesInfo {
         for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
             CacheData cacheData = new CacheData(desc.cacheConfiguration(),
                 0,
+                0,
                 desc.cacheType(),
                 desc.deploymentId(),
                 desc.schema(),
@@ -639,12 +754,15 @@ class ClusterCachesInfo {
                 desc.staticallyConfigured(),
                 false,
                 true,
-                (byte)0);
+                0);
 
             templates.put(desc.cacheName(), cacheData);
         }
 
-        return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
+        return new CacheNodeCommonDiscoveryData(caches,
+            templates,
+            cacheGrps,
+            ctx.discovery().clientNodesMap());
     }
 
     /**
@@ -659,11 +777,36 @@ class ClusterCachesInfo {
 
         CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
 
+        // Replace locally registered data with actual data received from cluster.
+        registeredCaches.clear();
+        registeredCacheGrps.clear();
+        ctx.discovery().onLocalNodeJoin();
+
+        for (CacheGroupData grpData : cachesData.cacheGroups().values()) {
+            CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
+                grpData.config(),
+                grpData.groupName(),
+                grpData.groupId(),
+                grpData.receivedFrom(),
+                grpData.startTopologyVersion(),
+                grpData.deploymentId(),
+                grpData.caches());
+
+            CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupId(), grpDesc);
+
+            assert old == null : old;
+
+            ctx.discovery().addCacheGroup(grpDesc,
+                grpData.config().getNodeFilter(),
+                grpData.config().getCacheMode());
+        }
+
         for (CacheData cacheData : cachesData.templates().values()) {
             DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
                 ctx,
                 cacheData.cacheConfiguration(),
                 cacheData.cacheType(),
+                null,
                 true,
                 cacheData.receivedFrom(),
                 cacheData.staticallyConfigured(),
@@ -675,12 +818,17 @@ class ClusterCachesInfo {
         }
 
         for (CacheData cacheData : cachesData.caches().values()) {
+            CacheGroupDescriptor grpDesc = registeredCacheGrps.get(cacheData.groupId());
+
+            assert grpDesc != null : cacheData.cacheConfiguration().getName();
+
             CacheConfiguration<?, ?> cfg = cacheData.cacheConfiguration();
 
             DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
                 ctx,
                 cacheData.cacheConfiguration(),
                 cacheData.cacheType(),
+                grpDesc,
                 false,
                 cacheData.receivedFrom(),
                 cacheData.staticallyConfigured(),
@@ -693,10 +841,9 @@ class ClusterCachesInfo {
             registeredCaches.put(cacheData.cacheConfiguration().getName(), desc);
 
             ctx.discovery().setCacheFilter(
+                grpDesc.groupId(),
                 cfg.getName(),
-                cfg.getNodeFilter(),
-                cfg.getNearConfiguration() != null,
-                cfg.getCacheMode());
+                cfg.getNearConfiguration() != null);
         }
 
         if (!F.isEmpty(cachesData.clientNodesMap())) {
@@ -708,7 +855,24 @@ class ClusterCachesInfo {
             }
         }
 
-        gridData = cachesData;
+        String conflictErr = null;
+
+        if (joinDiscoData != null) {
+            for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) {
+                if (!registeredCaches.containsKey(e.getKey())) {
+                    conflictErr = checkCacheConflict(e.getValue().config());
+
+                    if (conflictErr != null) {
+                        conflictErr = "Failed to start configured cache due to conflict with started caches. " +
+                            conflictErr;
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        gridData = new GridData(cachesData, conflictErr);
 
         if (!disconnectedState())
             initStartCachesForLocalJoin(false);
@@ -741,6 +905,7 @@ class ClusterCachesInfo {
                     DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
                             locCfg.config(),
                             desc.cacheType(),
+                            desc.groupDescriptor(),
                             desc.template(),
                             desc.receivedFrom(),
                             desc.staticallyConfigured(),
@@ -755,7 +920,9 @@ class ClusterCachesInfo {
                     desc = desc0;
                 }
 
-                if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter())) {
+                if (locCfg != null ||
+                    joinDiscoData.startCaches() ||
+                    CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) {
                     // Move system and internal caches first.
                     if (desc.cacheType().userCache())
                         locJoinStartCaches.add(new T2<>(desc, nearCfg));
@@ -784,7 +951,7 @@ class ClusterCachesInfo {
                     processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
             }
             else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
-                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
+                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId(), false);
         }
     }
 
@@ -808,10 +975,59 @@ class ClusterCachesInfo {
     }
 
     /**
+     * @param cfg Cache configuration.
+     * @return {@code True} if validation passed.
+     */
+    private String checkCacheConflict(CacheConfiguration<?, ?> cfg) {
+        int cacheId = CU.cacheId(cfg.getName());
+
+        if (cacheGroupByName(cfg.getName()) != null)
+            return "Cache name conflict with existing cache group (change cache name) [cacheName=" + cfg.getName() + ']';
+
+        if (cfg.getGroupName() != null) {
+            DynamicCacheDescriptor desc = registeredCaches.get(cfg.getGroupName());
+
+            if (desc != null)
+                return "Cache group name conflict with existing cache (change group name) [cacheName=" + cfg.getName() +
+                    ", conflictingCacheName=" + desc.cacheName() + ']';
+        }
+
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            if (desc.cacheId() == cacheId)
+                return "Cache ID conflict (change cache name) [cacheName=" + cfg.getName() +
+                    ", conflictingCacheName=" + desc.cacheName() + ']';
+        }
+
+        int grpId = cacheGroupId(cfg.getName(), cfg.getGroupName());
+
+        if (cfg.getGroupName() != null) {
+            if (cacheGroupByName(cfg.getGroupName()) == null) {
+                CacheGroupDescriptor desc = registeredCacheGrps.get(grpId);
+
+                if (desc != null)
+                    return "Cache group ID conflict (change cache group name) [cacheName=" + cfg.getName() +
+                        ", groupName=" + cfg.getGroupName() +
+                        (desc.sharedGroup() ? ", conflictingGroupName=" : ", conflictingCacheName=") + desc.cacheOrGroupName() + ']';
+            }
+        }
+        else {
+            CacheGroupDescriptor desc = registeredCacheGrps.get(grpId);
+
+            if (desc != null)
+                return "Cache group ID conflict (change cache name) [cacheName=" + cfg.getName() +
+                    (desc.sharedGroup() ? ", conflictingGroupName=" : ", conflictingCacheName=") + desc.cacheOrGroupName() + ']';
+        }
+
+        return null;
+    }
+
+    /**
      * @param joinData Joined node discovery data.
      * @param nodeId Joined node ID.
+     * @param locJoin {@code True} if called on local node join.
+     * @return Configuration conflict error.
      */
-    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
+    private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, boolean locJoin) {
         for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
             CacheConfiguration<?, ?> cfg = cacheInfo.config();
 
@@ -819,6 +1035,7 @@ class ClusterCachesInfo {
                 DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
                     cfg,
                     cacheInfo.cacheType(),
+                    null,
                     true,
                     nodeId,
                     true,
@@ -836,9 +1053,35 @@ class ClusterCachesInfo {
             CacheConfiguration<?, ?> cfg = cacheInfo.config();
 
             if (!registeredCaches.containsKey(cfg.getName())) {
+                String conflictErr = checkCacheConflict(cfg);
+
+                if (conflictErr != null) {
+                    if (locJoin)
+                        return conflictErr;
+
+                    U.warn(log, "Ignore cache received from joining node. " + conflictErr);
+
+                    continue;
+                }
+
+                int cacheId = CU.cacheId(cfg.getName());
+
+                CacheGroupDescriptor grpDesc = registerCacheGroup(null,
+                    null,
+                    cfg,
+                    cacheId,
+                    nodeId,
+                    joinData.cacheDeploymentId());
+
+                ctx.discovery().setCacheFilter(
+                    grpDesc.groupId(),
+                    cfg.getName(),
+                    cfg.getNearConfiguration() != null);
+
                 DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
                     cfg,
                     cacheInfo.cacheType(),
+                    grpDesc,
                     false,
                     nodeId,
                     true,
@@ -849,12 +1092,6 @@ class ClusterCachesInfo {
                 DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
 
                 assert old == null : old;
-
-                ctx.discovery().setCacheFilter(
-                    cfg.getName(),
-                    cfg.getNodeFilter(),
-                    cfg.getNearConfiguration() != null,
-                    cfg.getCacheMode());
             }
 
             ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
@@ -867,6 +1104,158 @@ class ClusterCachesInfo {
                     desc.cacheConfiguration().getNearConfiguration() != null);
             }
         }
+
+        return null;
+    }
+
+    /**
+     * @param grpName Group name.
+     * @return Group descriptor if group found.
+     */
+    @Nullable private CacheGroupDescriptor cacheGroupByName(String grpName) {
+        assert grpName != null;
+
+        for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
+            if (grpName.equals(grpDesc.groupName()))
+                return grpDesc;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Group descriptor.
+     */
+    @Nullable private CacheGroupDescriptor nonSharedCacheGroupByCacheName(String cacheName) {
+        assert cacheName != null;
+
+        for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
+            if (!grpDesc.sharedGroup() && grpDesc.caches().containsKey(cacheName))
+                return grpDesc;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param exchActions Optional exchange actions to update if new group was added.
+     * @param curTopVer Current topology version if dynamic cache started.
+     * @param startedCacheCfg Cache configuration.
+     * @param cacheId Cache ID.
+     * @param rcvdFrom Node ID cache was recived from.
+     * @param deploymentId Deployment ID.
+     * @return Group descriptor.
+     */
+    private CacheGroupDescriptor registerCacheGroup(
+        @Nullable ExchangeActions exchActions,
+        @Nullable AffinityTopologyVersion curTopVer,
+        CacheConfiguration<?, ?> startedCacheCfg,
+        Integer cacheId,
+        UUID rcvdFrom,
+        IgniteUuid deploymentId) {
+        if (startedCacheCfg.getGroupName() != null) {
+            CacheGroupDescriptor desc = cacheGroupByName(startedCacheCfg.getGroupName());
+
+            if (desc != null) {
+                desc.onCacheAdded(startedCacheCfg.getName(), cacheId);
+
+                return desc;
+            }
+        }
+
+        int grpId = cacheGroupId(startedCacheCfg.getName(), startedCacheCfg.getGroupName());
+
+        Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId);
+
+        CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
+            startedCacheCfg,
+            startedCacheCfg.getGroupName(),
+            grpId,
+            rcvdFrom,
+            curTopVer != null ? curTopVer.nextMinorVersion() : null,
+            deploymentId,
+            caches);
+
+        CacheGroupDescriptor old = registeredCacheGrps.put(grpId, grpDesc);
+
+        assert old == null : old;
+
+        ctx.discovery().addCacheGroup(grpDesc, grpDesc.config().getNodeFilter(), startedCacheCfg.getCacheMode());
+
+        if (exchActions != null)
+            exchActions.addCacheGroupToStart(grpDesc);
+
+        return grpDesc;
+    }
+
+    /**
+     * @return Registered cache groups.
+     */
+    ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() {
+        return registeredCacheGrps;
+    }
+
+    /**
+     * @param ccfg Cache configuration to start.
+     * @throws IgniteCheckedException If failed.
+     */
+    void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException {
+        if (ccfg.getGroupName() != null) {
+            CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName());
+
+            if (grpDesc != null) {
+                assert ccfg.getGroupName().equals(grpDesc.groupName());
+
+                validateCacheGroupConfiguration(grpDesc.config(), ccfg);
+            }
+        }
+    }
+
+    /**
+     * @param cfg Existing configuration.
+     * @param startCfg Cache configuration to start.
+     * @throws IgniteCheckedException If validation failed.
+     */
+    private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg)
+        throws IgniteCheckedException {
+        GridCacheAttributes attr1 = new GridCacheAttributes(cfg, false);
+        GridCacheAttributes attr2 = new GridCacheAttributes(startCfg, false);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "cacheMode", "Cache mode",
+            cfg.getCacheMode(), startCfg.getCacheMode(), true);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinity", "Affinity function",
+            attr1.cacheAffinityClassName(), attr2.cacheAffinityClassName(), true);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinityPartitionsCount",
+            "Affinity partitions count", attr1.affinityPartitionsCount(), attr2.affinityPartitionsCount(), true);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "nodeFilter", "Node filter",
+            attr1.nodeFilterClassName(), attr2.nodeFilterClassName(), true);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "memoryPolicyName", "Memory policy",
+            cfg.getMemoryPolicyName(), startCfg.getMemoryPolicyName(), true);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "topologyValidator", "Topology validator",
+            attr1.topologyValidatorClassName(), attr2.topologyValidatorClassName(), true);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "partitionLossPolicy", "Partition Loss Policy",
+            cfg.getPartitionLossPolicy(), startCfg.getPartitionLossPolicy(), true);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceMode", "Rebalance mode",
+            cfg.getRebalanceMode(), startCfg.getRebalanceMode(), true);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceDelay", "Rebalance delay",
+            cfg.getRebalanceDelay(), startCfg.getRebalanceDelay(), false);
+
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceOrder", "Rebalance order",
+            cfg.getRebalanceOrder(), startCfg.getRebalanceOrder(), false);
+
+        if (cfg.getCacheMode() == PARTITIONED) {
+            CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "backups", "Backups",
+                cfg.getBackups(), startCfg.getBackups(), true);
+        }
     }
 
     /**
@@ -887,8 +1276,11 @@ class ClusterCachesInfo {
      *
      */
     void onDisconnect() {
-        cachesOnDisconnect = new HashMap<>(registeredCaches);
+        cachesOnDisconnect = new CachesOnDisconnect(
+            new HashMap<>(registeredCacheGrps),
+            new HashMap<>(registeredCaches));
 
+        registeredCacheGrps.clear();
         registeredCaches.clear();
         registeredTemplates.clear();
 
@@ -896,14 +1288,41 @@ class ClusterCachesInfo {
     }
 
     /**
-     * @return Stopped caches names.
+     * @return Information about stopped caches and cache groups.
      */
-    Set<String> onReconnected() {
+    ClusterCachesReconnectResult onReconnected() {
         assert disconnectedState();
 
         Set<String> stoppedCaches = new HashSet<>();
+        Set<Integer> stoppedCacheGrps = new HashSet<>();
+
+        for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) {
+            CacheGroupDescriptor locDesc = e.getValue();
+
+            CacheGroupDescriptor desc;
+            boolean stopped = true;
 
-        for(Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.entrySet()) {
+            if (locDesc.sharedGroup()) {
+                desc = cacheGroupByName(locDesc.groupName());
+
+                if (desc != null && desc.deploymentId().equals(locDesc.deploymentId()))
+                    stopped = false;
+            }
+            else {
+                desc = nonSharedCacheGroupByCacheName(locDesc.config().getName());
+
+                if (desc != null &&
+                    (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId())))
+                    stopped = false;
+            }
+
+            if (stopped)
+                stoppedCacheGrps.add(locDesc.groupId());
+            else
+                assert locDesc.groupId() == desc.groupId();
+        }
+
+        for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) {
             DynamicCacheDescriptor desc = e.getValue();
 
             String cacheName = e.getKey();
@@ -931,7 +1350,7 @@ class ClusterCachesInfo {
 
         cachesOnDisconnect = null;
 
-        return stoppedCaches;
+        return new ClusterCachesReconnectResult(stoppedCacheGrps, stoppedCaches);
     }
 
     /**
@@ -953,6 +1372,48 @@ class ClusterCachesInfo {
      *
      */
     void clearCaches() {
+        registeredCacheGrps.clear();
+
         registeredCaches.clear();
     }
+
+    /**
+     *
+     */
+    static class GridData {
+        /** */
+        private final CacheNodeCommonDiscoveryData gridData;
+
+        /** */
+        private final String conflictErr;
+
+        /**
+         * @param gridData Grid data.
+         * @param conflictErr Cache configuration conflict error.
+         */
+        GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) {
+            this.gridData = gridData;
+            this.conflictErr = conflictErr;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CachesOnDisconnect {
+        /** */
+        final Map<Integer, CacheGroupDescriptor> cacheGrps;
+
+        /** */
+        final Map<String, DynamicCacheDescriptor> caches;
+
+        /**
+         * @param cacheGrps Cache groups.
+         * @param caches Caches.
+         */
+        CachesOnDisconnect(Map<Integer, CacheGroupDescriptor> cacheGrps, Map<String, DynamicCacheDescriptor> caches) {
+            this.cacheGrps = cacheGrps;
+            this.caches = caches;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java
new file mode 100644
index 0000000..23854c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Set;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+class ClusterCachesReconnectResult {
+    /** */
+    private final Set<Integer> stoppedCacheGrps;
+
+    /** */
+    private final Set<String> stoppedCaches;
+
+    /**
+     * @param stoppedCacheGrps Stopped cache groups.
+     * @param stoppedCaches Stopped caches.
+     */
+    ClusterCachesReconnectResult(Set<Integer> stoppedCacheGrps,
+        Set<String> stoppedCaches) {
+        this.stoppedCacheGrps = stoppedCacheGrps;
+        this.stoppedCaches = stoppedCaches;
+    }
+
+    /**
+     * @return Stopped cache groups.
+     */
+    Set<Integer> stoppedCacheGroups() {
+        return stoppedCacheGrps;
+    }
+
+    /**
+     * @return Stopped caches.
+     */
+    Set<String> stoppedCaches() {
+        return stoppedCaches;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClusterCachesReconnectResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index a682f63..315013d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -93,10 +93,14 @@ public class DynamicCacheDescriptor {
     /** Current schema. */
     private QuerySchema schema;
 
+    /** */
+    private final CacheGroupDescriptor grpDesc;
+
     /**
      * @param ctx Context.
      * @param cacheCfg Cache configuration.
      * @param cacheType Cache type.
+     * @param grpDesc Group descriptor.
      * @param template {@code True} if this is template configuration.
      * @param rcvdFrom ID of node provided cache configuration
      * @param staticCfg {@code True} if cache statically configured.
@@ -108,6 +112,7 @@ public class DynamicCacheDescriptor {
     public DynamicCacheDescriptor(GridKernalContext ctx,
         CacheConfiguration cacheCfg,
         CacheType cacheType,
+        CacheGroupDescriptor grpDesc,
         boolean template,
         UUID rcvdFrom,
         boolean staticCfg,
@@ -115,6 +120,7 @@ public class DynamicCacheDescriptor {
         IgniteUuid deploymentId,
         QuerySchema schema) {
         assert cacheCfg != null;
+        assert grpDesc != null || template;
         assert schema != null;
 
         if (cacheCfg.getCacheMode() == CacheMode.REPLICATED && cacheCfg.getNearConfiguration() != null) {
@@ -125,6 +131,7 @@ public class DynamicCacheDescriptor {
 
         this.cacheCfg = cacheCfg;
         this.cacheType = cacheType;
+        this.grpDesc = grpDesc;
         this.template = template;
         this.rcvdFrom = rcvdFrom;
         this.staticCfg = staticCfg;
@@ -141,6 +148,24 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * @return Cache group ID.
+     */
+    public int groupId() {
+        assert grpDesc != null : this;
+
+        return grpDesc.groupId();
+    }
+
+    /**
+     * @return Cache group descriptor.
+     */
+    public CacheGroupDescriptor groupDescriptor() {
+        assert grpDesc != null : this;
+
+        return grpDesc;
+    }
+
+    /**
      * @return Cache ID.
      */
     public Integer cacheId() {
@@ -203,6 +228,7 @@ public class DynamicCacheDescriptor {
      *
      * @param proc Object processor.
      * @return Cache object context.
+     * @throws IgniteCheckedException If failed.
      */
     public CacheObjectContext cacheObjectContext(IgniteCacheObjectProcessor proc) throws IgniteCheckedException {
         if (objCtx == null) {


Mime
View raw message