Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AA632200CAE for ; Tue, 6 Jun 2017 12:01:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A93BE160BC3; Tue, 6 Jun 2017 10:01:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A0C31160BF5 for ; Tue, 6 Jun 2017 12:01:25 +0200 (CEST) Received: (qmail 27924 invoked by uid 500); 6 Jun 2017 10:01:24 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 26929 invoked by uid 99); 6 Jun 2017 10:01:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jun 2017 10:01:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C6F91E04F2; Tue, 6 Jun 2017 10:01:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 06 Jun 2017 10:01:48 -0000 Message-Id: <25f0824b8e334041be2cc9d2f20640ee@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [27/31] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches archived-at: Tue, 06 Jun 2017 10:01:28 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java new file mode 100644 index 0000000..4844a55 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -0,0 +1,943 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataPageEvictionMode; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.CacheRebalancingEvent; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.database.MemoryPolicy; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheRebalanceMode.NONE; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; + +/** + * + */ +public class CacheGroupContext { + /** + * Unique group ID. Currently for shared group it is generated as group name hash, + * for non-shared as cache name hash (see {@link ClusterCachesInfo#checkCacheConflict}). + */ + private final int grpId; + + /** Node ID cache group was received from. */ + private final UUID rcvdFrom; + + /** Flag indicating that this cache group is in a recovery mode due to partitions loss. */ + private boolean needsRecovery; + + /** */ + private final AffinityTopologyVersion locStartVer; + + /** */ + private final CacheConfiguration ccfg; + + /** */ + private final GridCacheSharedContext ctx; + + /** */ + private final boolean affNode; + + /** */ + private final CacheType cacheType; + + /** */ + private final byte ioPlc; + + /** */ + private final boolean depEnabled; + + /** */ + private final boolean storeCacheId; + + /** */ + private volatile List caches; + + /** */ + private volatile List contQryCaches; + + /** */ + private final IgniteLogger log; + + /** */ + private GridAffinityAssignmentCache aff; + + /** */ + private GridDhtPartitionTopologyImpl top; + + /** */ + private IgniteCacheOffheapManager offheapMgr; + + /** */ + private GridCachePreloader preldr; + + /** */ + private final MemoryPolicy memPlc; + + /** */ + private final CacheObjectContext cacheObjCtx; + + /** */ + private final FreeList freeList; + + /** */ + private final ReuseList reuseList; + + /** */ + private boolean drEnabled; + + /** */ + private boolean qryEnabled; + + /** + * @param grpId Group ID. + * @param ctx Context. + * @param rcvdFrom Node ID cache group was received from. + * @param cacheType Cache type. + * @param ccfg Cache configuration. + * @param affNode Affinity node flag. + * @param memPlc Memory policy. + * @param cacheObjCtx Cache object context. + * @param freeList Free list. + * @param reuseList Reuse list. + * @param locStartVer Topology version when group was started on local node. + */ + CacheGroupContext( + GridCacheSharedContext ctx, + int grpId, + UUID rcvdFrom, + CacheType cacheType, + CacheConfiguration ccfg, + boolean affNode, + MemoryPolicy memPlc, + CacheObjectContext cacheObjCtx, + FreeList freeList, + ReuseList reuseList, + AffinityTopologyVersion locStartVer) { + assert ccfg != null; + assert memPlc != null || !affNode; + assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; + + this.grpId = grpId; + this.rcvdFrom = rcvdFrom; + this.ctx = ctx; + this.ccfg = ccfg; + this.affNode = affNode; + this.memPlc = memPlc; + this.cacheObjCtx = cacheObjCtx; + this.freeList = freeList; + this.reuseList = reuseList; + this.locStartVer = locStartVer; + this.cacheType = cacheType; + + ioPlc = cacheType.ioPolicy(); + + depEnabled = ctx.kernalContext().deploy().enabled() && !ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg); + + storeCacheId = affNode && memPlc.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED; + + log = ctx.kernalContext().log(getClass()); + + caches = new ArrayList<>(); + } + + /** + * @return {@code True} if this is cache group for one of system caches. + */ + public boolean systemCache() { + return !sharedGroup() && CU.isSystemCache(ccfg.getName()); + } + + /** + * @return Node ID initiated cache group start. + */ + public UUID receivedFrom() { + return rcvdFrom; + } + + /** + * @return {@code True} if cacheId should be stored in data pages. + */ + public boolean storeCacheIdInDataPage() { + return storeCacheId; + } + + /** + * @return {@code True} if deployment is enabled. + */ + public boolean deploymentEnabled() { + return depEnabled; + } + + /** + * @return Preloader. + */ + public GridCachePreloader preloader() { + return preldr; + } + + /** + * @return IO policy for the given cache group. + */ + public byte ioPolicy() { + return ioPlc; + } + + /** + * @param cctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { + addCacheContext(cctx); + + offheapMgr.onCacheStarted(cctx); + } + + /** + * @param cacheName Cache name. + * @return {@code True} if group contains cache with given name. + */ + public boolean hasCache(String cacheName) { + List caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + if (caches.get(i).name().equals(cacheName)) + return true; + } + + return false; + } + + /** + * @param cctx Cache context. + */ + private void addCacheContext(GridCacheContext cctx) { + assert cacheType.userCache() == cctx.userCache() : cctx.name(); + assert grpId == cctx.groupId() : cctx.name(); + + ArrayList caches = new ArrayList<>(this.caches); + + assert sharedGroup() || caches.isEmpty(); + + boolean add = caches.add(cctx); + + assert add : cctx.name(); + + if (!qryEnabled && QueryUtils.isEnabled(cctx.config())) + qryEnabled = true; + + if (!drEnabled && cctx.isDrEnabled()) + drEnabled = true; + + this.caches = caches; + } + + /** + * @param cctx Cache context. + */ + private void removeCacheContext(GridCacheContext cctx) { + ArrayList caches = new ArrayList<>(this.caches); + + // It is possible cache was not added in case of errors on cache start. + for (Iterator it = caches.iterator(); it.hasNext();) { + GridCacheContext next = it.next(); + + if (next == cctx) { + assert sharedGroup() || caches.size() == 1 : caches.size(); + + it.remove(); + + break; + } + } + + if (QueryUtils.isEnabled(cctx.config())) { + boolean qryEnabled = false; + + for (int i = 0; i < caches.size(); i++) { + if (QueryUtils.isEnabled(caches.get(i).config())) { + qryEnabled = true; + + break; + } + } + + this.qryEnabled = qryEnabled; + } + + if (cctx.isDrEnabled()) { + boolean drEnabled = false; + + for (int i = 0; i < caches.size(); i++) { + if (caches.get(i).isDrEnabled()) { + drEnabled = true; + + break; + } + } + + this.drEnabled = drEnabled; + } + + this.caches = caches; + } + + /** + * @return Cache context if group contains single cache. + */ + public GridCacheContext singleCacheContext() { + List caches = this.caches; + + assert !sharedGroup() && caches.size() == 1; + + return caches.get(0); + } + + /** + * + */ + public void unwindUndeploys() { + List caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + cctx.deploy().unwind(cctx); + } + } + + /** + * @param type Event type to check. + * @return {@code True} if given event type should be recorded. + */ + public boolean eventRecordable(int type) { + return cacheType.userCache() && ctx.gridEvents().isRecordable(type); + } + + /** + * Adds rebalancing event. + * + * @param part Partition. + * @param type Event type. + * @param discoNode Discovery node. + * @param discoType Discovery event type. + * @param discoTs Discovery event timestamp. + */ + public void addRebalanceEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) { + assert discoNode != null; + assert type > 0; + assert discoType > 0; + assert discoTs > 0; + + if (!eventRecordable(type)) + LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type)); + + List caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + if (cctx.recordEvent(type)) { + cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), + cctx.localNode(), + "Cache rebalancing event.", + type, + part, + discoNode, + discoType, + discoTs)); + } + } + } + /** + * Adds partition unload event. + * + * @param part Partition. + */ + public void addUnloadEvent(int part) { + if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) + LT.warn(log, "Added event without checking if event is recordable: " + + U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED)); + + List caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), + cctx.localNode(), + "Cache unloading event.", + EVT_CACHE_REBALANCE_PART_UNLOADED, + part, + null, + 0, + 0)); + } + } + + /** + * @param part Partition. + * @param key Key. + * @param evtNodeId Event node ID. + * @param type Event type. + * @param newVal New value. + * @param hasNewVal Has new value flag. + * @param oldVal Old values. + * @param hasOldVal Has old value flag. + * @param keepBinary Keep binary flag. + */ + public void addCacheEvent( + int part, + KeyCacheObject key, + UUID evtNodeId, + int type, + @Nullable CacheObject newVal, + boolean hasNewVal, + @Nullable CacheObject oldVal, + boolean hasOldVal, + boolean keepBinary + ) { + List caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + cctx.events().addEvent(part, + key, + evtNodeId, + (IgniteUuid)null, + null, + type, + newVal, + hasNewVal, + oldVal, + hasOldVal, + null, + null, + null, + keepBinary); + } + } + + /** + * @return {@code True} if contains cache with query indexing enabled. + */ + public boolean queriesEnabled() { + return qryEnabled; + } + + /** + * @return {@code True} if fast eviction is allowed. + */ + public boolean allowFastEviction() { + return ctx.database().persistenceEnabled() && !queriesEnabled(); + } + + /** + * @return {@code True} in case replication is enabled. + */ + public boolean isDrEnabled() { + return drEnabled; + } + + /** + * @return Free List. + */ + public FreeList freeList() { + return freeList; + } + + /** + * @return Reuse List. + */ + public ReuseList reuseList() { + return reuseList; + } + + /** + * @return Cache object context. + */ + public CacheObjectContext cacheObjectContext() { + return cacheObjCtx; + } + + /** + * @return Cache shared context. + */ + public GridCacheSharedContext shared() { + return ctx; + } + + /** + * @return Memory policy. + */ + public MemoryPolicy memoryPolicy() { + return memPlc; + } + + /** + * @return {@code True} if local node is affinity node. + */ + public boolean affinityNode() { + return affNode; + } + + /** + * @return Topology. + */ + public GridDhtPartitionTopology topology() { + if (top == null) + throw new IllegalStateException("Topology is not initialized: " + name()); + + return top; + } + + /** + * @return Offheap manager. + */ + public IgniteCacheOffheapManager offheap() { + return offheapMgr; + } + + /** + * @return Current cache state. Must only be modified during exchange. + */ + public boolean needsRecovery() { + return needsRecovery; + } + + /** + * @param needsRecovery Needs recovery flag. + */ + public void needsRecovery(boolean needsRecovery) { + this.needsRecovery = needsRecovery; + } + + /** + * @return Topology version when group was started on local node. + */ + public AffinityTopologyVersion localStartVersion() { + return locStartVer; + } + + /** + * @return {@code True} if cache is local. + */ + public boolean isLocal() { + return ccfg.getCacheMode() == LOCAL; + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration config() { + return ccfg; + } + + /** + * @return Cache node filter. + */ + public IgnitePredicate nodeFilter() { + return ccfg.getNodeFilter(); + } + + /** + * @return Configured user objects which should be initialized/stopped on group start/stop. + */ + Collection configuredUserObjects() { + return Arrays.asList(ccfg.getAffinity(), ccfg.getNodeFilter(), ccfg.getTopologyValidator()); + } + + /** + * @return Configured topology validator. + */ + @Nullable public TopologyValidator topologyValidator() { + return ccfg.getTopologyValidator(); + } + + /** + * @return Configured affinity function. + */ + public AffinityFunction affinityFunction() { + return ccfg.getAffinity(); + } + + /** + * @return Affinity. + */ + public GridAffinityAssignmentCache affinity() { + return aff; + } + + /** + * @return Group name or {@code null} if group name was not specified for cache. + */ + @Nullable public String name() { + return ccfg.getGroupName(); + } + + /** + * @return Group name if it is specified, otherwise cache name. + */ + public String cacheOrGroupName() { + return ccfg.getGroupName() != null ? ccfg.getGroupName() : ccfg.getName(); + } + + /** + * @return Group ID. + */ + public int groupId() { + return grpId; + } + + /** + * @return {@code True} if group can contain multiple caches. + */ + public boolean sharedGroup() { + return ccfg.getGroupName() != null; + } + + /** + * + */ + public void onKernalStop() { + aff.cancelFutures(new IgniteCheckedException("Failed to wait for topology update, node is stopping.")); + + preldr.onKernalStop(); + + offheapMgr.onKernalStop(); + } + + /** + * @param cctx Cache context. + * @param destroy Destroy flag. + */ + void stopCache(GridCacheContext cctx, boolean destroy) { + if (top != null) + top.onCacheStopped(cctx.cacheId()); + + offheapMgr.stopCache(cctx.cacheId(), destroy); + + removeCacheContext(cctx); + } + + /** + * + */ + void stopGroup() { + IgniteCheckedException err = + new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); + + aff.cancelFutures(err); + + offheapMgr.stop(); + + ctx.io().removeCacheGroupHandlers(grpId); + } + + /** + * @return IDs of caches in this group. + */ + public Set cacheIds() { + List caches = this.caches; + + Set ids = U.newHashSet(caches.size()); + + for (int i = 0; i < caches.size(); i++) + ids.add(caches.get(i).cacheId()); + + return ids; + } + + /** + * @return {@code True} if group contains caches. + */ + boolean hasCaches() { + List caches = this.caches; + + return !caches.isEmpty(); + } + + /** + * @param part Partition ID. + */ + public void onPartitionEvicted(int part) { + List caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + if (cctx.isDrEnabled()) + cctx.dr().partitionEvicted(part); + + cctx.continuousQueries().onPartitionEvicted(part); + + cctx.dataStructures().onPartitionEvicted(part); + } + } + + /** + * @param cctx Cache context. + */ + public void addCacheWithContinuousQuery(GridCacheContext cctx) { + assert sharedGroup() : cacheOrGroupName(); + assert cctx.group() == this : cctx.name(); + assert !cctx.isLocal() : cctx.name(); + + synchronized (this) { + List contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + contQryCaches = new ArrayList<>(); + + contQryCaches.add(cctx); + + this.contQryCaches = contQryCaches; + } + } + + /** + * @param cctx Cache context. + */ + public void removeCacheWithContinuousQuery(GridCacheContext cctx) { + assert sharedGroup() : cacheOrGroupName(); + assert cctx.group() == this : cctx.name(); + assert !cctx.isLocal() : cctx.name(); + + synchronized (this) { + List contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + return; + + contQryCaches.remove(cctx); + + if (contQryCaches.isEmpty()) + contQryCaches = null; + + this.contQryCaches = contQryCaches; + } + } + + /** + * @param cacheId ID of cache initiated counter update. + * @param part Partition number. + * @param cntr Counter. + * @param topVer Topology version for current operation. + */ + public void onPartitionCounterUpdate(int cacheId, + int part, + long cntr, + AffinityTopologyVersion topVer, + boolean primary) { + assert sharedGroup(); + + if (isLocal()) + return; + + List contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + return; + + CounterSkipContext skipCtx = null; + + for (int i = 0; i < contQryCaches.size(); i++) { + GridCacheContext cctx = contQryCaches.get(i); + + if (cacheId != cctx.cacheId()) + skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer, primary); + } + + final List procC = skipCtx != null ? skipCtx.processClosures() : null; + + if (procC != null) { + ctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + for (Runnable c : procC) + c.run(); + } + }); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void start() throws IgniteCheckedException { + aff = new GridAffinityAssignmentCache(ctx.kernalContext(), + cacheOrGroupName(), + grpId, + ccfg.getAffinity(), + ccfg.getNodeFilter(), + ccfg.getBackups(), + ccfg.getCacheMode() == LOCAL); + + if (ccfg.getCacheMode() != LOCAL) { + top = new GridDhtPartitionTopologyImpl(ctx, this); + + if (!ctx.kernalContext().clientNode()) { + ctx.io().addCacheGroupHandler(groupId(), GridDhtAffinityAssignmentRequest.class, + new IgniteBiInClosure() { + @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) { + processAffinityAssignmentRequest(nodeId, msg); + } + }); + } + + preldr = new GridDhtPreloader(this); + + preldr.start(); + } + else + preldr = new GridCachePreloaderAdapter(this); + + offheapMgr = new IgniteCacheOffheapManagerImpl(); + + offheapMgr.start(ctx, this); + + ctx.affinity().onCacheGroupCreated(this); + } + + /** + * @param nodeId Node ID. + * @param req Request. + */ + private void processAffinityAssignmentRequest(final UUID nodeId, + final GridDhtAffinityAssignmentRequest req) { + if (log.isDebugEnabled()) + log.debug("Processing affinity assignment request [node=" + nodeId + ", req=" + req + ']'); + + IgniteInternalFuture fut = aff.readyFuture(req.topologyVersion()); + + if (fut != null) { + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + processAffinityAssignmentRequest0(nodeId, req); + } + }); + } + else + processAffinityAssignmentRequest0(nodeId, req); + } + + /** + * @param nodeId Node ID. + * @param req Request. + */ + private void processAffinityAssignmentRequest0(UUID nodeId, final GridDhtAffinityAssignmentRequest req) { + AffinityTopologyVersion topVer = req.topologyVersion(); + + if (log.isDebugEnabled()) + log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + + ", node=" + nodeId + ']'); + + AffinityAssignment assignment = aff.cachedAffinity(topVer); + + GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse( + req.futureId(), + grpId, + topVer, + assignment.assignment()); + + if (aff.centralizedAffinityFunction()) { + assert assignment.idealAssignment() != null; + + res.idealAffinityAssignment(assignment.idealAssignment()); + } + + try { + ctx.io().send(nodeId, res, AFFINITY_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send affinity assignment response to remote node [node=" + nodeId + ']', e); + } + } + + /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture reconnectFut) { + IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, + "Failed to wait for topology update, client disconnected."); + + if (aff != null) + aff.cancelFutures(err); + } + + /** + * @return {@code True} if rebalance is enabled. + */ + public boolean rebalanceEnabled() { + return ccfg.getRebalanceMode() != NONE; + } + + /** + * + */ + public void onReconnected() { + aff.onReconnected(); + + if (top != null) + top.onReconnected(); + + preldr.onReconnected(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CacheGroupContext [grp=" + cacheOrGroupName() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java new file mode 100644 index 0000000..a290caf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CacheGroupData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final int grpId; + + /** */ + private final String grpName; + + /** */ + private final AffinityTopologyVersion startTopVer; + + /** */ + private final UUID rcvdFrom; + + /** */ + private final IgniteUuid deploymentId; + + /** */ + private final CacheConfiguration cacheCfg; + + /** */ + @GridToStringInclude + private final Map caches; + + /** */ + private long flags; + + /** + * @param cacheCfg Cache configuration. + * @param grpName Group name. + * @param grpId Group ID. + * @param rcvdFrom Node ID cache group received from. + * @param startTopVer Start version for dynamically started group. + * @param deploymentId Deployment ID. + * @param caches Cache group caches. + */ + CacheGroupData( + CacheConfiguration cacheCfg, + @Nullable String grpName, + int grpId, + UUID rcvdFrom, + @Nullable AffinityTopologyVersion startTopVer, + IgniteUuid deploymentId, + Map caches, + long flags) { + assert cacheCfg != null; + assert grpId != 0; + assert deploymentId != null; + + this.cacheCfg = cacheCfg; + this.grpName = grpName; + this.grpId = grpId; + this.rcvdFrom = rcvdFrom; + this.startTopVer = startTopVer; + this.deploymentId = deploymentId; + this.caches = caches; + this.flags = flags; + } + + /** + * @return Start version for dynamically started group. + */ + @Nullable public AffinityTopologyVersion startTopologyVersion() { + return startTopVer; + } + + /** + * @return Node ID group was received from. + */ + public UUID receivedFrom() { + return rcvdFrom; + } + + /** + * @return Group name. + */ + @Nullable public String groupName() { + return grpName; + } + + /** + * @return Group ID. + */ + public int groupId() { + return grpId; + } + + /** + * @return Deployment ID. + */ + public IgniteUuid deploymentId() { + return deploymentId; + } + + /** + * @return Configuration. + */ + public CacheConfiguration config() { + return cacheCfg; + } + + /** + * @return Group caches. + */ + Map caches() { + return caches; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheGroupData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java new file mode 100644 index 0000000..c4976e5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CacheGroupDescriptor { + /** */ + private final int grpId; + + /** */ + private final String grpName; + + /** */ + private final AffinityTopologyVersion startTopVer; + + /** */ + private final UUID rcvdFrom; + + /** */ + private final IgniteUuid deploymentId; + + /** */ + @GridToStringExclude + private final CacheConfiguration cacheCfg; + + /** */ + @GridToStringInclude + private Map caches; + + /** */ + private AffinityTopologyVersion rcvdFromVer; + + /** + * @param cacheCfg Cache configuration. + * @param grpName Group name. + * @param grpId Group ID. + * @param rcvdFrom Node ID cache group received from. + * @param startTopVer Start version for dynamically started group. + * @param deploymentId Deployment ID. + * @param caches Cache group caches. + */ + CacheGroupDescriptor( + CacheConfiguration cacheCfg, + @Nullable String grpName, + int grpId, + UUID rcvdFrom, + @Nullable AffinityTopologyVersion startTopVer, + IgniteUuid deploymentId, + Map caches) { + assert cacheCfg != null; + assert grpId != 0; + + this.grpName = grpName; + this.grpId = grpId; + this.rcvdFrom = rcvdFrom; + this.startTopVer = startTopVer; + this.deploymentId = deploymentId; + this.cacheCfg = cacheCfg; + this.caches = caches; + } + + /** + * @return Node ID group was received from. + */ + public UUID receivedFrom() { + return rcvdFrom; + } + + /** + * @return Deployment ID. + */ + public IgniteUuid deploymentId() { + return deploymentId; + } + + /** + * @param cacheName Cache name + * @param cacheId Cache ID. + */ + void onCacheAdded(String cacheName, int cacheId) { + assert cacheName != null; + assert cacheId != 0 : cacheName; + + Map caches = new HashMap<>(this.caches); + + caches.put(cacheName, cacheId); + + this.caches = caches; + } + + /** + * @param cacheName Cache name + * @param cacheId Cache ID. + */ + void onCacheStopped(String cacheName, int cacheId) { + assert cacheName != null; + assert cacheId != 0; + + Map caches = new HashMap<>(this.caches); + + Integer rmvd = caches.remove(cacheName); + + assert rmvd != null && rmvd == cacheId : cacheName; + + this.caches = caches; + } + + /** + * @return {@code True} if group contains cache. + */ + boolean hasCaches() { + return caches != null && !caches.isEmpty(); + } + + /** + * @return {@code True} if group can contain multiple caches. + */ + public boolean sharedGroup() { + return grpName != null; + } + + /** + * @return Group name if it is specified, otherwise cache name. + */ + public String cacheOrGroupName() { + return grpName != null ? grpName : cacheCfg.getName(); + } + + /** + * @return Group name or {@code null} if group name was not specified for cache. + */ + @Nullable public String groupName() { + return grpName; + } + + /** + * @return Group ID. + */ + public int groupId() { + return grpId; + } + + /** + * @return Configuration. + */ + public CacheConfiguration config() { + return cacheCfg; + } + + /** + * @return Group caches. + */ + public Map caches() { + return caches; + } + + /** + * @return Topology version when node provided cache configuration was started. + */ + @Nullable AffinityTopologyVersion receivedFromStartVersion() { + return rcvdFromVer; + } + + /** + * @param rcvdFromVer Topology version when node provided cache configuration was started. + */ + void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) { + this.rcvdFromVer = rcvdFromVer; + } + + /** + * @return Start version for dynamically started group. + */ + @Nullable public AffinityTopologyVersion startTopologyVersion() { + return startTopVer; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheGroupDescriptor.class, this, "cacheName", cacheCfg.getName()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java index afc01c9..58c9d82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java @@ -111,7 +111,7 @@ class CacheJoinNodeDiscoveryData implements Serializable { private final boolean sql; /** Flags added for future usage. */ - private final byte flags; + private final long flags; /** * @param ccfg Cache configuration. @@ -119,7 +119,7 @@ class CacheJoinNodeDiscoveryData implements Serializable { * @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}. * @param flags Flags (for future usage). */ - CacheInfo(CacheConfiguration ccfg, CacheType cacheType, boolean sql, byte flags) { + CacheInfo(CacheConfiguration ccfg, CacheType cacheType, boolean sql, long flags) { this.ccfg = ccfg; this.cacheType = cacheType; this.sql = sql; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index d4ad8e4..c9ee641 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -195,7 +195,10 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public long getOffHeapPrimaryEntriesCount() { try { - return cctx.offheap().entriesCount(true, false, cctx.affinity().affinityTopologyVersion()); + return cctx.offheap().cacheEntriesCount(cctx.cacheId(), + true, + false, + cctx.affinity().affinityTopologyVersion()); } catch (IgniteCheckedException ignored) { return 0; @@ -205,7 +208,10 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public long getOffHeapBackupEntriesCount() { try { - return cctx.offheap().entriesCount(false, true, cctx.affinity().affinityTopologyVersion()); + return cctx.offheap().cacheEntriesCount(cctx.cacheId(), + false, + true, + cctx.affinity().affinityTopologyVersion()); } catch (IgniteCheckedException ignored) { return 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java index 84a33dc..4c70cb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java @@ -39,22 +39,41 @@ class CacheNodeCommonDiscoveryData implements Serializable { private final Map templates; /** */ + @GridToStringInclude + private final Map cacheGrps; + + /** */ private final Map> clientNodesMap; /** * @param caches Started caches. * @param templates Configured templates. + * @param cacheGrps Started cache groups. * @param clientNodesMap Information about cache client nodes. */ CacheNodeCommonDiscoveryData(Map caches, Map templates, + Map cacheGrps, Map> clientNodesMap) { + assert caches != null; + assert templates != null; + assert cacheGrps != null; + assert clientNodesMap != null; + this.caches = caches; this.templates = templates; + this.cacheGrps = cacheGrps; this.clientNodesMap = clientNodesMap; } /** + * @return Started cache groups. + */ + Map cacheGroups() { + return cacheGrps; + } + + /** * @return Started caches. */ Map caches() { http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java index f8e9f32..d737c8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java @@ -41,15 +41,10 @@ public class CacheOffheapEvictionManager extends GridCacheManagerAdapter impleme return; try { - if (e.markObsoleteIfEmpty(null) || e.obsolete()) { - e.context().cache().removeEntry(e); + boolean evicted = e.evictInternal(GridCacheVersionManager.EVICT_VER, null, false) + || e.markObsoleteIfEmpty(null); - return; - } - - boolean evicted = e.evictInternal(GridCacheVersionManager.EVICT_VER, null, false); - - if (evicted) + if (evicted && !e.isDht()) // GridDhtCacheEntry removes entry when obsoleted. cctx.cache().removeEntry(e); } catch (IgniteCheckedException ex) { http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index e4d2668..0fcc740 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -45,8 +45,11 @@ import org.apache.ignite.internal.processors.query.schema.SchemaOperationExcepti import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -63,6 +66,9 @@ class ClusterCachesInfo { /** Dynamic caches. */ private final ConcurrentMap registeredCaches = new ConcurrentHashMap<>(); + /** */ + private final ConcurrentMap registeredCacheGrps = new ConcurrentHashMap<>(); + /** Cache templates. */ private final ConcurrentMap registeredTemplates = new ConcurrentHashMap<>(); @@ -70,13 +76,13 @@ class ClusterCachesInfo { private final IgniteLogger log; /** */ - private Map cachesOnDisconnect; + private CachesOnDisconnect cachesOnDisconnect; /** */ private CacheJoinNodeDiscoveryData joinDiscoData; /** */ - private CacheNodeCommonDiscoveryData gridData; + private GridData gridData; /** */ private List> locJoinStartCaches; @@ -95,11 +101,40 @@ class ClusterCachesInfo { /** * @param joinDiscoData Information about configured caches and templates. + * @throws IgniteCheckedException If configuration validation failed. */ - void onStart(CacheJoinNodeDiscoveryData joinDiscoData) { + void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException { this.joinDiscoData = joinDiscoData; - processJoiningNode(joinDiscoData, ctx.localNodeId()); + Map grpCfgs = new HashMap<>(); + + for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) { + if (info.config().getGroupName() == null) + continue; + + CacheConfiguration ccfg = grpCfgs.get(info.config().getGroupName()); + + if (ccfg == null) + grpCfgs.put(info.config().getGroupName(), info.config()); + else + validateCacheGroupConfiguration(ccfg, info.config()); + } + + String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true); + + if (conflictErr != null) + throw new IgniteCheckedException("Failed to start configured cache. " + conflictErr); + } + + /** + * @param cacheName Cache name. + * @param grpName Group name. + * @return Group ID. + */ + private int cacheGroupId(String cacheName, @Nullable String grpName) { + assert cacheName != null; + + return grpName != null ? CU.cacheId(grpName) : CU.cacheId(cacheName); } /** @@ -107,14 +142,19 @@ class ClusterCachesInfo { * @throws IgniteCheckedException If failed. */ void onKernalStart(boolean checkConsistency) throws IgniteCheckedException { + if (gridData != null && gridData.conflictErr != null) + throw new IgniteCheckedException(gridData.conflictErr); + if (checkConsistency && joinDiscoData != null && gridData != null) { for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) { CacheConfiguration locCfg = locCacheInfo.config(); - CacheData cacheData = gridData.caches().get(locCfg.getName()); + CacheData cacheData = gridData.gridData.caches().get(locCfg.getName()); if (cacheData != null) checkCache(locCacheInfo, cacheData, cacheData.receivedFrom()); + + validateStartCacheConfiguration(locCfg); } } @@ -138,6 +178,9 @@ class ClusterCachesInfo { CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode", locAttr.cacheMode(), rmtAttr.cacheMode(), true); + CU.checkAttributeMismatch(log, rmtAttr.groupName(), rmt, "groupName", "Cache group name", + locAttr.groupName(), rmtAttr.groupName(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "sql", "SQL flag", locAttr.sql(), rmtAttr.sql(), true); @@ -151,6 +194,9 @@ class ClusterCachesInfo { CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode", "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "topologyValidator", + "Cache topology validator", locAttr.topologyValidatorClassName(), rmtAttr.topologyValidatorClassName(), true); + ClusterNode rmtNode = ctx.discovery().node(rmt); if (CU.affinityNode(ctx.discovery().localNode(), locInfo.config().getNodeFilter()) @@ -251,6 +297,7 @@ class ClusterCachesInfo { DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), + null, true, req.initiatingNodeId(), false, @@ -278,13 +325,24 @@ class ClusterCachesInfo { if (req.start()) { if (desc == null) { + String conflictErr = checkCacheConflict(req.startCacheConfiguration()); + + if (conflictErr != null) { + U.warn(log, "Ignore cache start request. " + conflictErr); + + ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + + "cache. " + conflictErr)); + + continue; + } + if (req.clientStartOnly()) { ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + "client cache (a cache with the given name is not started): " + req.cacheName())); } else { SchemaOperationException err = QueryUtils.checkQueryEntityConflicts( - req.startCacheConfiguration(), ctx.cache().cacheDescriptors()); + req.startCacheConfiguration(), ctx.cache().cacheDescriptors().values()); if (err != null) { ctx.cache().completeCacheStartFuture(req, false, err); @@ -297,9 +355,19 @@ class ClusterCachesInfo { assert req.cacheType() != null : req; assert F.eq(ccfg.getName(), req.cacheName()) : req; + int cacheId = CU.cacheId(req.cacheName()); + + CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions, + topVer, + ccfg, + cacheId, + req.initiatingNodeId(), + req.deploymentId()); + DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), + grpDesc, false, req.initiatingNodeId(), false, @@ -312,10 +380,9 @@ class ClusterCachesInfo { assert old == null; ctx.discovery().setCacheFilter( + grpDesc.groupId(), ccfg.getName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode()); + ccfg.getNearConfiguration() != null); ctx.discovery().addClientNode(req.cacheName(), req.initiatingNodeId(), @@ -391,8 +458,6 @@ class ClusterCachesInfo { } } else if (req.stop()) { - assert req.stop() ^ req.close() : req; - if (desc != null) { if (req.sql() && !desc.sql()) { ctx.cache().completeCacheStartFuture(req, false, @@ -412,13 +477,27 @@ class ClusterCachesInfo { DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName()); - assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']'; + assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']'; ctx.discovery().removeCacheFilter(req.cacheName()); needExchange = true; exchangeActions.addCacheToStop(req, desc); + + CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupId()); + + assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc; + + grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId()); + + if (!grpDesc.hasCaches()) { + registeredCacheGrps.remove(grpDesc.groupId()); + + ctx.discovery().removeCacheGroup(grpDesc); + + exchangeActions.addCacheGroupToStop(grpDesc); + } } } else if (req.close()) { @@ -498,10 +577,24 @@ class ClusterCachesInfo { */ private Serializable joinDiscoveryData() { if (cachesOnDisconnect != null) { + Map cacheGrpsInfo = new HashMap<>(); Map cachesInfo = new HashMap<>(); + Map grps = cachesOnDisconnect.cacheGrps; + Map caches = cachesOnDisconnect.caches; + + for (CacheGroupContext grp : ctx.cache().cacheGroups()) { + CacheGroupDescriptor desc = grps.get(grp.groupId()); + + assert desc != null : grp.cacheOrGroupName(); + + cacheGrpsInfo.put(grp.groupId(), new CacheClientReconnectDiscoveryData.CacheGroupInfo(desc.config(), + desc.deploymentId(), + 0)); + } + for (IgniteInternalCache cache : ctx.cache().caches()) { - DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name()); + DynamicCacheDescriptor desc = caches.get(cache.name()); assert desc != null : cache.name(); @@ -509,10 +602,10 @@ class ClusterCachesInfo { desc.cacheType(), desc.deploymentId(), cache.context().isNear(), - (byte)0)); + 0)); } - return new CacheClientReconnectDiscoveryData(cachesInfo); + return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo); } else { assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active(); @@ -575,6 +668,11 @@ class ClusterCachesInfo { */ void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && !ctx.isDaemon()) { + for (CacheGroupDescriptor desc : registeredCacheGrps.values()) { + if (node.id().equals(desc.receivedFrom())) + desc.receivedFromStartVersion(topVer); + } + for (DynamicCacheDescriptor desc : registeredCaches.values()) { if (node.id().equals(desc.receivedFrom())) desc.receivedFromStartVersion(topVer); @@ -610,11 +708,27 @@ class ClusterCachesInfo { * @return Information about started caches. */ private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() { + Map cacheGrps = new HashMap<>(); + + for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) { + CacheGroupData grpData = new CacheGroupData(grpDesc.config(), + grpDesc.groupName(), + grpDesc.groupId(), + grpDesc.receivedFrom(), + grpDesc.startTopologyVersion(), + grpDesc.deploymentId(), + grpDesc.caches(), + 0); + + cacheGrps.put(grpDesc.groupId(), grpData); + } + Map caches = new HashMap<>(); for (DynamicCacheDescriptor desc : registeredCaches.values()) { CacheData cacheData = new CacheData(desc.cacheConfiguration(), desc.cacheId(), + desc.groupId(), desc.cacheType(), desc.deploymentId(), desc.schema(), @@ -622,7 +736,7 @@ class ClusterCachesInfo { desc.staticallyConfigured(), desc.sql(), false, - (byte)0); + 0); caches.put(desc.cacheName(), cacheData); } @@ -632,6 +746,7 @@ class ClusterCachesInfo { for (DynamicCacheDescriptor desc : registeredTemplates.values()) { CacheData cacheData = new CacheData(desc.cacheConfiguration(), 0, + 0, desc.cacheType(), desc.deploymentId(), desc.schema(), @@ -639,12 +754,15 @@ class ClusterCachesInfo { desc.staticallyConfigured(), false, true, - (byte)0); + 0); templates.put(desc.cacheName(), cacheData); } - return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap()); + return new CacheNodeCommonDiscoveryData(caches, + templates, + cacheGrps, + ctx.discovery().clientNodesMap()); } /** @@ -659,11 +777,36 @@ class ClusterCachesInfo { CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData(); + // Replace locally registered data with actual data received from cluster. + registeredCaches.clear(); + registeredCacheGrps.clear(); + ctx.discovery().onLocalNodeJoin(); + + for (CacheGroupData grpData : cachesData.cacheGroups().values()) { + CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( + grpData.config(), + grpData.groupName(), + grpData.groupId(), + grpData.receivedFrom(), + grpData.startTopologyVersion(), + grpData.deploymentId(), + grpData.caches()); + + CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupId(), grpDesc); + + assert old == null : old; + + ctx.discovery().addCacheGroup(grpDesc, + grpData.config().getNodeFilter(), + grpData.config().getCacheMode()); + } + for (CacheData cacheData : cachesData.templates().values()) { DynamicCacheDescriptor desc = new DynamicCacheDescriptor( ctx, cacheData.cacheConfiguration(), cacheData.cacheType(), + null, true, cacheData.receivedFrom(), cacheData.staticallyConfigured(), @@ -675,12 +818,17 @@ class ClusterCachesInfo { } for (CacheData cacheData : cachesData.caches().values()) { + CacheGroupDescriptor grpDesc = registeredCacheGrps.get(cacheData.groupId()); + + assert grpDesc != null : cacheData.cacheConfiguration().getName(); + CacheConfiguration cfg = cacheData.cacheConfiguration(); DynamicCacheDescriptor desc = new DynamicCacheDescriptor( ctx, cacheData.cacheConfiguration(), cacheData.cacheType(), + grpDesc, false, cacheData.receivedFrom(), cacheData.staticallyConfigured(), @@ -693,10 +841,9 @@ class ClusterCachesInfo { registeredCaches.put(cacheData.cacheConfiguration().getName(), desc); ctx.discovery().setCacheFilter( + grpDesc.groupId(), cfg.getName(), - cfg.getNodeFilter(), - cfg.getNearConfiguration() != null, - cfg.getCacheMode()); + cfg.getNearConfiguration() != null); } if (!F.isEmpty(cachesData.clientNodesMap())) { @@ -708,7 +855,24 @@ class ClusterCachesInfo { } } - gridData = cachesData; + String conflictErr = null; + + if (joinDiscoData != null) { + for (Map.Entry e : joinDiscoData.caches().entrySet()) { + if (!registeredCaches.containsKey(e.getKey())) { + conflictErr = checkCacheConflict(e.getValue().config()); + + if (conflictErr != null) { + conflictErr = "Failed to start configured cache due to conflict with started caches. " + + conflictErr; + + break; + } + } + } + } + + gridData = new GridData(cachesData, conflictErr); if (!disconnectedState()) initStartCachesForLocalJoin(false); @@ -741,6 +905,7 @@ class ClusterCachesInfo { DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, locCfg.config(), desc.cacheType(), + desc.groupDescriptor(), desc.template(), desc.receivedFrom(), desc.staticallyConfigured(), @@ -755,7 +920,9 @@ class ClusterCachesInfo { desc = desc0; } - if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter())) { + if (locCfg != null || + joinDiscoData.startCaches() || + CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) { // Move system and internal caches first. if (desc.cacheType().userCache()) locJoinStartCaches.add(new T2<>(desc, nearCfg)); @@ -784,7 +951,7 @@ class ClusterCachesInfo { processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId()); } else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) - processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId()); + processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId(), false); } } @@ -808,10 +975,59 @@ class ClusterCachesInfo { } /** + * @param cfg Cache configuration. + * @return {@code True} if validation passed. + */ + private String checkCacheConflict(CacheConfiguration cfg) { + int cacheId = CU.cacheId(cfg.getName()); + + if (cacheGroupByName(cfg.getName()) != null) + return "Cache name conflict with existing cache group (change cache name) [cacheName=" + cfg.getName() + ']'; + + if (cfg.getGroupName() != null) { + DynamicCacheDescriptor desc = registeredCaches.get(cfg.getGroupName()); + + if (desc != null) + return "Cache group name conflict with existing cache (change group name) [cacheName=" + cfg.getName() + + ", conflictingCacheName=" + desc.cacheName() + ']'; + } + + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (desc.cacheId() == cacheId) + return "Cache ID conflict (change cache name) [cacheName=" + cfg.getName() + + ", conflictingCacheName=" + desc.cacheName() + ']'; + } + + int grpId = cacheGroupId(cfg.getName(), cfg.getGroupName()); + + if (cfg.getGroupName() != null) { + if (cacheGroupByName(cfg.getGroupName()) == null) { + CacheGroupDescriptor desc = registeredCacheGrps.get(grpId); + + if (desc != null) + return "Cache group ID conflict (change cache group name) [cacheName=" + cfg.getName() + + ", groupName=" + cfg.getGroupName() + + (desc.sharedGroup() ? ", conflictingGroupName=" : ", conflictingCacheName=") + desc.cacheOrGroupName() + ']'; + } + } + else { + CacheGroupDescriptor desc = registeredCacheGrps.get(grpId); + + if (desc != null) + return "Cache group ID conflict (change cache name) [cacheName=" + cfg.getName() + + (desc.sharedGroup() ? ", conflictingGroupName=" : ", conflictingCacheName=") + desc.cacheOrGroupName() + ']'; + } + + return null; + } + + /** * @param joinData Joined node discovery data. * @param nodeId Joined node ID. + * @param locJoin {@code True} if called on local node join. + * @return Configuration conflict error. */ - private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) { + private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, boolean locJoin) { for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { CacheConfiguration cfg = cacheInfo.config(); @@ -819,6 +1035,7 @@ class ClusterCachesInfo { DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheInfo.cacheType(), + null, true, nodeId, true, @@ -836,9 +1053,35 @@ class ClusterCachesInfo { CacheConfiguration cfg = cacheInfo.config(); if (!registeredCaches.containsKey(cfg.getName())) { + String conflictErr = checkCacheConflict(cfg); + + if (conflictErr != null) { + if (locJoin) + return conflictErr; + + U.warn(log, "Ignore cache received from joining node. " + conflictErr); + + continue; + } + + int cacheId = CU.cacheId(cfg.getName()); + + CacheGroupDescriptor grpDesc = registerCacheGroup(null, + null, + cfg, + cacheId, + nodeId, + joinData.cacheDeploymentId()); + + ctx.discovery().setCacheFilter( + grpDesc.groupId(), + cfg.getName(), + cfg.getNearConfiguration() != null); + DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheInfo.cacheType(), + grpDesc, false, nodeId, true, @@ -849,12 +1092,6 @@ class ClusterCachesInfo { DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc); assert old == null : old; - - ctx.discovery().setCacheFilter( - cfg.getName(), - cfg.getNodeFilter(), - cfg.getNearConfiguration() != null, - cfg.getCacheMode()); } ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null); @@ -867,6 +1104,158 @@ class ClusterCachesInfo { desc.cacheConfiguration().getNearConfiguration() != null); } } + + return null; + } + + /** + * @param grpName Group name. + * @return Group descriptor if group found. + */ + @Nullable private CacheGroupDescriptor cacheGroupByName(String grpName) { + assert grpName != null; + + for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) { + if (grpName.equals(grpDesc.groupName())) + return grpDesc; + } + + return null; + } + + /** + * @param cacheName Cache name. + * @return Group descriptor. + */ + @Nullable private CacheGroupDescriptor nonSharedCacheGroupByCacheName(String cacheName) { + assert cacheName != null; + + for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) { + if (!grpDesc.sharedGroup() && grpDesc.caches().containsKey(cacheName)) + return grpDesc; + } + + return null; + } + + /** + * @param exchActions Optional exchange actions to update if new group was added. + * @param curTopVer Current topology version if dynamic cache started. + * @param startedCacheCfg Cache configuration. + * @param cacheId Cache ID. + * @param rcvdFrom Node ID cache was recived from. + * @param deploymentId Deployment ID. + * @return Group descriptor. + */ + private CacheGroupDescriptor registerCacheGroup( + @Nullable ExchangeActions exchActions, + @Nullable AffinityTopologyVersion curTopVer, + CacheConfiguration startedCacheCfg, + Integer cacheId, + UUID rcvdFrom, + IgniteUuid deploymentId) { + if (startedCacheCfg.getGroupName() != null) { + CacheGroupDescriptor desc = cacheGroupByName(startedCacheCfg.getGroupName()); + + if (desc != null) { + desc.onCacheAdded(startedCacheCfg.getName(), cacheId); + + return desc; + } + } + + int grpId = cacheGroupId(startedCacheCfg.getName(), startedCacheCfg.getGroupName()); + + Map caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId); + + CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( + startedCacheCfg, + startedCacheCfg.getGroupName(), + grpId, + rcvdFrom, + curTopVer != null ? curTopVer.nextMinorVersion() : null, + deploymentId, + caches); + + CacheGroupDescriptor old = registeredCacheGrps.put(grpId, grpDesc); + + assert old == null : old; + + ctx.discovery().addCacheGroup(grpDesc, grpDesc.config().getNodeFilter(), startedCacheCfg.getCacheMode()); + + if (exchActions != null) + exchActions.addCacheGroupToStart(grpDesc); + + return grpDesc; + } + + /** + * @return Registered cache groups. + */ + ConcurrentMap registeredCacheGroups() { + return registeredCacheGrps; + } + + /** + * @param ccfg Cache configuration to start. + * @throws IgniteCheckedException If failed. + */ + void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException { + if (ccfg.getGroupName() != null) { + CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName()); + + if (grpDesc != null) { + assert ccfg.getGroupName().equals(grpDesc.groupName()); + + validateCacheGroupConfiguration(grpDesc.config(), ccfg); + } + } + } + + /** + * @param cfg Existing configuration. + * @param startCfg Cache configuration to start. + * @throws IgniteCheckedException If validation failed. + */ + private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg) + throws IgniteCheckedException { + GridCacheAttributes attr1 = new GridCacheAttributes(cfg, false); + GridCacheAttributes attr2 = new GridCacheAttributes(startCfg, false); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "cacheMode", "Cache mode", + cfg.getCacheMode(), startCfg.getCacheMode(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinity", "Affinity function", + attr1.cacheAffinityClassName(), attr2.cacheAffinityClassName(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinityPartitionsCount", + "Affinity partitions count", attr1.affinityPartitionsCount(), attr2.affinityPartitionsCount(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "nodeFilter", "Node filter", + attr1.nodeFilterClassName(), attr2.nodeFilterClassName(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "memoryPolicyName", "Memory policy", + cfg.getMemoryPolicyName(), startCfg.getMemoryPolicyName(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "topologyValidator", "Topology validator", + attr1.topologyValidatorClassName(), attr2.topologyValidatorClassName(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "partitionLossPolicy", "Partition Loss Policy", + cfg.getPartitionLossPolicy(), startCfg.getPartitionLossPolicy(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceMode", "Rebalance mode", + cfg.getRebalanceMode(), startCfg.getRebalanceMode(), true); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceDelay", "Rebalance delay", + cfg.getRebalanceDelay(), startCfg.getRebalanceDelay(), false); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceOrder", "Rebalance order", + cfg.getRebalanceOrder(), startCfg.getRebalanceOrder(), false); + + if (cfg.getCacheMode() == PARTITIONED) { + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "backups", "Backups", + cfg.getBackups(), startCfg.getBackups(), true); + } } /** @@ -887,8 +1276,11 @@ class ClusterCachesInfo { * */ void onDisconnect() { - cachesOnDisconnect = new HashMap<>(registeredCaches); + cachesOnDisconnect = new CachesOnDisconnect( + new HashMap<>(registeredCacheGrps), + new HashMap<>(registeredCaches)); + registeredCacheGrps.clear(); registeredCaches.clear(); registeredTemplates.clear(); @@ -896,14 +1288,41 @@ class ClusterCachesInfo { } /** - * @return Stopped caches names. + * @return Information about stopped caches and cache groups. */ - Set onReconnected() { + ClusterCachesReconnectResult onReconnected() { assert disconnectedState(); Set stoppedCaches = new HashSet<>(); + Set stoppedCacheGrps = new HashSet<>(); + + for (Map.Entry e : cachesOnDisconnect.cacheGrps.entrySet()) { + CacheGroupDescriptor locDesc = e.getValue(); + + CacheGroupDescriptor desc; + boolean stopped = true; - for(Map.Entry e : cachesOnDisconnect.entrySet()) { + if (locDesc.sharedGroup()) { + desc = cacheGroupByName(locDesc.groupName()); + + if (desc != null && desc.deploymentId().equals(locDesc.deploymentId())) + stopped = false; + } + else { + desc = nonSharedCacheGroupByCacheName(locDesc.config().getName()); + + if (desc != null && + (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId()))) + stopped = false; + } + + if (stopped) + stoppedCacheGrps.add(locDesc.groupId()); + else + assert locDesc.groupId() == desc.groupId(); + } + + for (Map.Entry e : cachesOnDisconnect.caches.entrySet()) { DynamicCacheDescriptor desc = e.getValue(); String cacheName = e.getKey(); @@ -931,7 +1350,7 @@ class ClusterCachesInfo { cachesOnDisconnect = null; - return stoppedCaches; + return new ClusterCachesReconnectResult(stoppedCacheGrps, stoppedCaches); } /** @@ -953,6 +1372,48 @@ class ClusterCachesInfo { * */ void clearCaches() { + registeredCacheGrps.clear(); + registeredCaches.clear(); } + + /** + * + */ + static class GridData { + /** */ + private final CacheNodeCommonDiscoveryData gridData; + + /** */ + private final String conflictErr; + + /** + * @param gridData Grid data. + * @param conflictErr Cache configuration conflict error. + */ + GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) { + this.gridData = gridData; + this.conflictErr = conflictErr; + } + } + + /** + * + */ + private static class CachesOnDisconnect { + /** */ + final Map cacheGrps; + + /** */ + final Map caches; + + /** + * @param cacheGrps Cache groups. + * @param caches Caches. + */ + CachesOnDisconnect(Map cacheGrps, Map caches) { + this.cacheGrps = cacheGrps; + this.caches = caches; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java new file mode 100644 index 0000000..23854c3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Set; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +class ClusterCachesReconnectResult { + /** */ + private final Set stoppedCacheGrps; + + /** */ + private final Set stoppedCaches; + + /** + * @param stoppedCacheGrps Stopped cache groups. + * @param stoppedCaches Stopped caches. + */ + ClusterCachesReconnectResult(Set stoppedCacheGrps, + Set stoppedCaches) { + this.stoppedCacheGrps = stoppedCacheGrps; + this.stoppedCaches = stoppedCaches; + } + + /** + * @return Stopped cache groups. + */ + Set stoppedCacheGroups() { + return stoppedCacheGrps; + } + + /** + * @return Stopped caches. + */ + Set stoppedCaches() { + return stoppedCaches; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClusterCachesReconnectResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index a682f63..315013d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -93,10 +93,14 @@ public class DynamicCacheDescriptor { /** Current schema. */ private QuerySchema schema; + /** */ + private final CacheGroupDescriptor grpDesc; + /** * @param ctx Context. * @param cacheCfg Cache configuration. * @param cacheType Cache type. + * @param grpDesc Group descriptor. * @param template {@code True} if this is template configuration. * @param rcvdFrom ID of node provided cache configuration * @param staticCfg {@code True} if cache statically configured. @@ -108,6 +112,7 @@ public class DynamicCacheDescriptor { public DynamicCacheDescriptor(GridKernalContext ctx, CacheConfiguration cacheCfg, CacheType cacheType, + CacheGroupDescriptor grpDesc, boolean template, UUID rcvdFrom, boolean staticCfg, @@ -115,6 +120,7 @@ public class DynamicCacheDescriptor { IgniteUuid deploymentId, QuerySchema schema) { assert cacheCfg != null; + assert grpDesc != null || template; assert schema != null; if (cacheCfg.getCacheMode() == CacheMode.REPLICATED && cacheCfg.getNearConfiguration() != null) { @@ -125,6 +131,7 @@ public class DynamicCacheDescriptor { this.cacheCfg = cacheCfg; this.cacheType = cacheType; + this.grpDesc = grpDesc; this.template = template; this.rcvdFrom = rcvdFrom; this.staticCfg = staticCfg; @@ -141,6 +148,24 @@ public class DynamicCacheDescriptor { } /** + * @return Cache group ID. + */ + public int groupId() { + assert grpDesc != null : this; + + return grpDesc.groupId(); + } + + /** + * @return Cache group descriptor. + */ + public CacheGroupDescriptor groupDescriptor() { + assert grpDesc != null : this; + + return grpDesc; + } + + /** * @return Cache ID. */ public Integer cacheId() { @@ -203,6 +228,7 @@ public class DynamicCacheDescriptor { * * @param proc Object processor. * @return Cache object context. + * @throws IgniteCheckedException If failed. */ public CacheObjectContext cacheObjectContext(IgniteCacheObjectProcessor proc) throws IgniteCheckedException { if (objCtx == null) {