Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 44E92200CA5 for ; Tue, 2 May 2017 17:08:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 43943160BA1; Tue, 2 May 2017 15:08:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 92D98160BAC for ; Tue, 2 May 2017 17:08:03 +0200 (CEST) Received: (qmail 88982 invoked by uid 500); 2 May 2017 15:08:02 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 88901 invoked by uid 99); 2 May 2017 15:08:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 May 2017 15:08:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 974EFDFD78; Tue, 2 May 2017 15:08:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 02 May 2017 15:08:03 -0000 Message-Id: <906620fca22547eb9db11afac9561392@git.apache.org> In-Reply-To: <5497be03beb942668b6a81e7ea52d1dc@git.apache.org> References: <5497be03beb942668b6a81e7ea52d1dc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/26] ignite git commit: cache discovery data refactoring archived-at: Tue, 02 May 2017 15:08:06 -0000 cache discovery data refactoring Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c73e1c90 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c73e1c90 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c73e1c90 Branch: refs/heads/ignite-5075 Commit: c73e1c90ba7fea972e95728b94078a69c35bafbd Parents: 4be320a Author: sboikov Authored: Wed Apr 26 14:01:40 2017 +0300 Committer: sboikov Committed: Wed Apr 26 19:38:44 2017 +0300 ---------------------------------------------------------------------- .../internal/processors/cache/CacheData.java | 152 ++++ .../cache/CacheJoinNodeDiscoveryData.java | 91 ++ .../cache/CacheNodeCommonDiscoveryData.java | 56 ++ .../CacheReconnectClientDiscoveryData.java | 26 + .../processors/cache/ClusterCachesInfo.java | 620 ++++++++++++++ .../cache/DynamicCacheDescriptor.java | 3 - .../processors/cache/GridCacheContext.java | 6 + .../processors/cache/GridCacheIoManager.java | 9 +- .../processors/cache/GridCacheProcessor.java | 834 +++++++------------ .../GridDhtPartitionsExchangeFuture.java | 9 +- 10 files changed, 1270 insertions(+), 536 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java new file mode 100644 index 0000000..4579c27 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.UUID; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; + +/** + * + */ +public class CacheData implements Serializable { + /** */ + private final CacheConfiguration cacheCfg; + + /** */ + private final Integer cacheId; + + /** */ + private final CacheType cacheType; + + /** */ + private final AffinityTopologyVersion startTopVer; + + /** */ + private final IgniteUuid deploymentId; + + /** */ + private final QuerySchema schema; + + /** */ + private final UUID rcvdFrom; + + /** */ + private final boolean staticCfg; + + /** */ + private final boolean template; + + CacheData(CacheConfiguration cacheCfg, + int cacheId, + CacheType cacheType, + AffinityTopologyVersion startTopVer, + IgniteUuid deploymentId, + QuerySchema schema, + UUID rcvdFrom, + boolean staticCfg, + boolean template) { + assert cacheCfg != null; + assert rcvdFrom != null; + assert startTopVer != null; + assert deploymentId != null; + assert template || cacheId != 0; + + this.cacheCfg = cacheCfg; + this.cacheId = cacheId; + this.cacheType = cacheType; + this.startTopVer = startTopVer; + this.deploymentId = deploymentId; + this.schema = schema; + this.rcvdFrom = rcvdFrom; + this.staticCfg = staticCfg; + this.template = template; + } + + /** + * @return Cache ID. + */ + public Integer cacheId() { + return cacheId; + } + + /** + * @return Start topology version. + */ + public AffinityTopologyVersion startTopologyVersion() { + return startTopVer; + } + + /** + * @return {@code True} if this is template configuration. + */ + public boolean template() { + return template; + } + + /** + * @return Cache type. + */ + public CacheType cacheType() { + return cacheType; + } + + /** + * @return Start ID. + */ + public IgniteUuid deploymentId() { + return deploymentId; + } + + /** + * @return {@code True} if statically configured. + */ + public boolean staticallyConfigured() { + return staticCfg; + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration cacheConfiguration() { + return cacheCfg; + } + + /** + * @return Schema. + */ + public QuerySchema schema() { + return schema.copy(); + } + + /** + * @return ID of node provided cache configuration. + */ + public UUID receivedFrom() { + return rcvdFrom; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java new file mode 100644 index 0000000..0624217 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.lang.IgniteUuid; + +/** + * + */ +class CacheJoinNodeDiscoveryData implements Serializable { + /** */ + private final Map caches; + + /** */ + private final Map templates; + + /** */ + private final IgniteUuid cacheDeploymentId; + + /** + * @param cacheDeploymentId Deployment ID for started caches. + * @param caches Caches. + * @param templates Templates. + */ + CacheJoinNodeDiscoveryData( + IgniteUuid cacheDeploymentId, + Map caches, + Map templates) { + this.cacheDeploymentId = cacheDeploymentId; + this.caches = caches; + this.templates = templates; + } + + IgniteUuid cacheDeploymentId() { + return cacheDeploymentId; + } + + Map templates() { + return templates; + } + + Map caches() { + return caches; + } + + /** + * + */ + static class CacheInfo implements Serializable { + /** */ + private final CacheConfiguration ccfg; + + /** */ + private final CacheType cacheType; + + /** */ + private final byte flags; + + CacheInfo(CacheConfiguration ccfg, CacheType cacheType, byte flags) { + this.ccfg = ccfg; + this.cacheType = cacheType; + this.flags = flags; + } + + CacheConfiguration config() { + return ccfg; + } + + CacheType cacheType() { + return cacheType; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java new file mode 100644 index 0000000..10df452 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.Map; +import java.util.UUID; + +/** + * + */ +class CacheNodeCommonDiscoveryData implements Serializable { + /** */ + private final Map caches; + + /** */ + private final Map templates; + + /** */ + private final Map> clientNodesMap; + + CacheNodeCommonDiscoveryData(Map caches, + Map templates, + Map> clientNodesMap) { + this.caches = caches; + this.templates = templates; + this.clientNodesMap = clientNodesMap; + } + + Map caches() { + return caches; + } + + Map templates() { + return templates; + } + + Map> clientNodesMap() { + return clientNodesMap; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java new file mode 100644 index 0000000..10a8f7e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; + +/** + * + */ +public class CacheReconnectClientDiscoveryData implements Serializable { +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java new file mode 100644 index 0000000..bd4ee1f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheExistsException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; + +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; + +/** + * + */ +class ClusterCachesInfo { + /** */ + private final GridKernalContext ctx; + + /** Dynamic caches. */ + private final ConcurrentMap registeredCaches = new ConcurrentHashMap<>(); + + /** Cache templates. */ + private final ConcurrentMap registeredTemplates = new ConcurrentHashMap<>(); + + /** */ + private CacheJoinNodeDiscoveryData joinDiscoData; + + /** */ + private CacheNodeCommonDiscoveryData gridData; + + /** */ + private List locJoinStartCaches; + + /** + * @param ctx Context. + */ + ClusterCachesInfo(GridKernalContext ctx) { + this.ctx = ctx; + } + + void onStart(CacheJoinNodeDiscoveryData joinDiscoData) { + this.joinDiscoData = joinDiscoData; + } + + void onKernalStart() throws IgniteCheckedException { + + } + + /** + * @param batch Cache change request. + * @param topVer Topology version. + * @return {@code True} if minor topology version should be increased. + */ + boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) { + boolean incMinorTopVer = false; + + List added = null; + + for (DynamicCacheChangeRequest req : batch.requests()) { + if (req.template()) { + CacheConfiguration ccfg = req.startCacheConfiguration(); + + assert ccfg != null : req; + + DynamicCacheDescriptor desc = registeredTemplates().get(req.cacheName()); + + if (desc == null) { + DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, + ccfg, + req.cacheType(), + true, + req.deploymentId(), + req.schema()); + + templateDesc.receivedFrom(req.initiatingNodeId()); + + DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc); + + assert old == null; + + if (added == null) + added = new ArrayList<>(); + + added.add(templateDesc); + } + + ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId()); + + continue; + } + + DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); + + boolean needExchange = false; + + if (req.start()) { + if (desc == null) { + if (req.clientStartOnly()) { + ctx.cache().completeCacheStartFuture(req, new IgniteCheckedException("Failed to start " + + "client cache (a cache with the given name is not started): " + req.cacheName())); + } + else { + CacheConfiguration ccfg = req.startCacheConfiguration(); + + assert req.cacheType() != null : req; + assert F.eq(ccfg.getName(), req.cacheName()) : req; + + DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, + ccfg, + req.cacheType(), + false, + req.deploymentId(), + req.schema()); + + startDesc.receivedFrom(req.initiatingNodeId()); + + DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc); + + assert old == null; + + ctx.discovery().setCacheFilter( + ccfg.getName(), + ccfg.getNodeFilter(), + ccfg.getNearConfiguration() != null, + ccfg.getCacheMode()); + + ctx.discovery().addClientNode(req.cacheName(), + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); + + added.add(startDesc); + + needExchange = true; + } + } + else { + assert req.initiatingNodeId() != null : req; + + // Cache already exists, exchange is needed only if client cache should be created. + ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); + + boolean clientReq = node != null && + !ctx.discovery().cacheAffinityNode(node, req.cacheName()); + + if (req.clientStartOnly()) { + needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); + } + else { + if (req.failIfExists()) { + ctx.cache().completeCacheStartFuture(req, + new CacheExistsException("Failed to start cache " + + "(a cache with the same name is already started): " + req.cacheName())); + } + else { + needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); + + if (needExchange) + req.clientStartOnly(true); + } + } + + if (needExchange) { + if (newTopVer == null) { + newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(), + topVer.minorTopologyVersion() + 1); + } + + desc.clientCacheStartVersion(newTopVer); + } + } + + if (!needExchange && desc != null) { + if (desc.clientCacheStartVersion() != null) + req.cacheFutureTopologyVersion(desc.clientCacheStartVersion()); + else + req.cacheFutureTopologyVersion(desc.startTopologyVersion()); + } + } + else if (req.globalStateChange() || req.resetLostPartitions()) + needExchange = true; + else { + assert req.stop() ^ req.close() : req; + + if (desc != null) { + if (req.stop()) { + DynamicCacheDescriptor old = cachesInfo.registeredCaches().remove(maskNull(req.cacheName())); + + assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']'; + + ctx.discovery().removeCacheFilter(req.cacheName()); + + needExchange = true; + } + else { + assert req.close() : req; + + needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); + } + } + } + + req.exchangeNeeded(needExchange); + + incMinorTopVer |= needExchange; + } + + if (added != null) { + AffinityTopologyVersion startTopVer = incMinorTopVer ? + new AffinityTopologyVersion(topVer.topologyVersion(), topVer.minorTopologyVersion() + 1) : topVer; + + for (DynamicCacheDescriptor desc : added) + desc.startTopologyVersion(startTopVer); + } + + return incMinorTopVer; + } + + CacheJoinNodeDiscoveryData joinDiscoveryData() { + if (cachesOnDisconnect != null) { +// Collection reqs; +// +// Map> clientNodesMap; +// +// reqs = new ArrayList<>(caches.size() + 1); +// +// clientNodesMap = U.newHashMap(caches.size()); +// +// collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId); + + // TODO + return null; + } + else { + assert ctx.config().isDaemon() || joinDiscoData != null; + + return joinDiscoData; + } + } + + /** + * @param reqs requests. + * @param clientNodesMap Client nodes map. + * @param nodeId Node id. + */ + private void collectDataOnReconnectingNode( + Collection caches, + Collection reqs, + Map> clientNodesMap, + UUID nodeId + ) { + for (GridCacheAdapter cache : caches) { + DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name()); + + if (desc == null) + continue; + + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null); + + req.startCacheConfiguration(desc.cacheConfiguration()); + req.cacheType(desc.cacheType()); + req.deploymentId(desc.deploymentId()); + req.receivedFrom(desc.receivedFrom()); + req.schema(desc.schema()); + + reqs.add(req); + + Boolean nearEnabled = cache.isNear(); + + Map map = U.newHashMap(1); + + map.put(nodeId, nearEnabled); + + clientNodesMap.put(cache.name(), map); + } + } + + /** + * Called from exchange worker. + * + * @return Caches to be started when this node starts. + */ + List cachesToStartOnLocalJoin() { + assert locJoinStartCaches != null; + + List locJoinStartCaches = this.locJoinStartCaches; + + this.locJoinStartCaches = null; + + return locJoinStartCaches; + } + + List cachesReceivedFromJoin(UUID joinedNodeId) { + assert joinedNodeId != null; + + List started = null; + + if (!ctx.clientNode() && !ctx.isDaemon()) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (desc.staticallyConfigured()) { + assert desc.receivedFrom() != null : desc; + + IgnitePredicate filter = desc.cacheConfiguration().getNodeFilter(); + + if (joinedNodeId.equals(desc.receivedFrom()) && + CU.affinityNode(ctx.discovery().localNode(), filter)) { + if (started == null) + started = new ArrayList<>(); + + started.add(desc); + } + } + } + } + + return started; + } + + /** + * Discovery event callback, executed from discovery thread. + * + * @param type Event type. + * @param node Event node. + * @param topVer Topology version. + */ + void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { + if (type == EVT_NODE_JOINED) { + if (node.id().equals(ctx.discovery().localNode().id())) { + if (gridData == null) { // First node starts. + assert registeredCaches.isEmpty(); + assert registeredTemplates.isEmpty(); + assert joinDiscoData != null; + + processJoiningNode(joinDiscoData, node.id()); + } + + assert locJoinStartCaches == null; + + locJoinStartCaches = new ArrayList<>(); + + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration cfg = desc.cacheConfiguration(); + + boolean locCfg = joinDiscoData.caches().containsKey(cfg.getName()); + + if (locCfg || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter())) + locJoinStartCaches.add(desc); + } + + joinDiscoData = null; + } + + initStartVersionOnJoin(registeredCaches.values(), node, topVer); + + initStartVersionOnJoin(registeredTemplates.values(), node, topVer); + } + } + + private void initStartVersionOnJoin(Collection descs, + ClusterNode joinedNode, + AffinityTopologyVersion topVer) { + for (DynamicCacheDescriptor cacheDesc : descs) { + if (cacheDesc.staticallyConfigured() && joinedNode.id().equals(cacheDesc.receivedFrom())) + cacheDesc.startTopologyVersion(topVer); + } + } + + CacheNodeCommonDiscoveryData collectCommonDiscoveryData() { + Map caches = new HashMap<>(); + + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheData cacheData = new CacheData(desc.cacheConfiguration(), + desc.cacheId(), + desc.cacheType(), + desc.startTopologyVersion(), + desc.deploymentId(), + desc.schema(), + desc.receivedFrom(), + desc.staticallyConfigured(), + false); + + caches.put(desc.cacheConfiguration().getName(), cacheData); + } + + Map templates = new HashMap<>(); + + for (DynamicCacheDescriptor desc : registeredTemplates.values()) { + CacheData cacheData = new CacheData(desc.cacheConfiguration(), + 0, + desc.cacheType(), + desc.startTopologyVersion(), + null, + desc.schema(), + desc.receivedFrom(), + desc.staticallyConfigured(), + true); + + templates.put(desc.cacheConfiguration().getName(), cacheData); + } + + return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap()); + } + + void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { + assert joinDiscoData != null; + assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data; + + CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData(); + + for (CacheData cacheData : cachesData.templates().values()) { + DynamicCacheDescriptor desc = new DynamicCacheDescriptor( + ctx, + cacheData.cacheConfiguration(), + cacheData.cacheType(), + true, + cacheData.deploymentId(), + cacheData.schema()); + + desc.startTopologyVersion(cacheData.startTopologyVersion()); + desc.receivedFrom(cacheData.receivedFrom()); + desc.staticallyConfigured(cacheData.staticallyConfigured()); + + DynamicCacheDescriptor old = registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc); + + assert old == null; + } + + for (CacheData cacheData : cachesData.caches().values()) { + CacheConfiguration cfg = cacheData.cacheConfiguration(); + + DynamicCacheDescriptor desc = new DynamicCacheDescriptor( + ctx, + cacheData.cacheConfiguration(), + cacheData.cacheType(), + false, + cacheData.deploymentId(), + cacheData.schema()); + + desc.startTopologyVersion(cacheData.startTopologyVersion()); + desc.receivedFrom(cacheData.receivedFrom()); + desc.staticallyConfigured(cacheData.staticallyConfigured()); + + DynamicCacheDescriptor old = registeredCaches.put(cacheData.cacheConfiguration().getName(), desc); + + assert old == null; + + ctx.discovery().setCacheFilter( + cfg.getName(), + cfg.getNodeFilter(), + cfg.getNearConfiguration() != null, + cfg.getCacheMode()); + } + + if (!F.isEmpty(cachesData.clientNodesMap())) { + for (Map.Entry> entry : cachesData.clientNodesMap().entrySet()) { + String cacheName = entry.getKey(); + + for (Map.Entry tup : entry.getValue().entrySet()) + ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); + } + } + + gridData = cachesData; + } + + void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { + if (data.hasJoiningNodeData()) { + Serializable joiningNodeData = data.joiningNodeData(); + + if (joiningNodeData instanceof CacheReconnectClientDiscoveryData) { + // TODO + } + else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) + processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId()); + } + } + + private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) { + for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { + CacheConfiguration cfg = cacheInfo.config(); + + if (!registeredTemplates.containsKey(cfg.getName())) { + DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, + cfg, + cacheInfo.cacheType(), + true, + joinData.cacheDeploymentId(), + new QuerySchema(cfg.getQueryEntities())); + + desc.staticallyConfigured(true); + desc.receivedFrom(nodeId); + + DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc); + + assert old == null : old; + } + } + + for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) { + CacheConfiguration cfg = cacheInfo.config(); + + if (!registeredCaches.containsKey(cfg.getName())) { + DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, + cfg, + cacheInfo.cacheType(), + false, + joinData.cacheDeploymentId(), + new QuerySchema(cfg.getQueryEntities())); + + desc.staticallyConfigured(true); + desc.receivedFrom(nodeId); + + DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc); + + assert old == null : old; + + ctx.discovery().setCacheFilter( + cfg.getName(), + cfg.getNodeFilter(), + cfg.getNearConfiguration() != null, + cfg.getCacheMode()); + } + + ctx.discovery().addClientNode(cfg.getName(), + nodeId, + cfg.getNearConfiguration() != null); + } + } + + ConcurrentMap registeredCaches() { + return registeredCaches; + } + + ConcurrentMap registeredTemplates() { + return registeredTemplates; + } + + /** */ + private Map cachesOnDisconnect; + + void onDisconnect() { + cachesOnDisconnect = new HashMap<>(registeredCaches); + + registeredCaches.clear(); + registeredTemplates.clear(); + } + + Set onReconnected() { + assert cachesOnDisconnect != null; + + Set stoppedCaches = new HashSet<>(); + + for(Map.Entry e : cachesOnDisconnect.entrySet()) { + DynamicCacheDescriptor desc = e.getValue(); + + String name = e.getKey(); + + boolean stopped; + + boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name); + + if (!sysCache) { + DynamicCacheDescriptor newDesc = registeredCaches.get(name); + + stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId()); + } + else + stopped = false; + + if (stopped) + stoppedCaches.add(name); + } + + cachesOnDisconnect = null; + + return stoppedCaches; + } + + void clearCaches() { + registeredCaches.clear(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 92a7af3..a2e91e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -50,9 +50,6 @@ public class DynamicCacheDescriptor { /** Statically configured flag. */ private boolean staticCfg; - /** Started flag. */ - private boolean started; - /** Cache type. */ private CacheType cacheType; http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 92c144c..67f25b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -237,6 +237,8 @@ public class GridCacheContext implements Externalizable { /** Start topology version. */ private AffinityTopologyVersion startTopVer; + private AffinityTopologyVersion cacheStartTopVer; + /** Dynamic cache deployment ID. */ private IgniteUuid dynamicDeploymentId; @@ -458,6 +460,10 @@ public class GridCacheContext implements Externalizable { this.startTopVer = startTopVer; } + public void cacheStartTopologyVersion(AffinityTopologyVersion cacheStartTopVer) { + this.cacheStartTopVer = cacheStartTopVer; + } + /** * @return Cache default {@link ExpiryPolicy}. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index fdd29e4..b9c066b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -150,12 +150,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId()); - if (cacheDesc != null) { - if (cacheDesc.startTopologyVersion() != null) - startTopVer = cacheDesc.startTopologyVersion(); - else if (cacheDesc.receivedFromStartVersion() != null) - startTopVer = cacheDesc.receivedFromStartVersion(); - } + // TODO: should be specified on request since cache desc can be removed, + if (cacheDesc != null) + startTopVer = cacheDesc.startTopologyVersion(); // Need to wait for exchange to avoid race between cache start and affinity request. fut = cctx.exchange().affinityReadyFuture(startTopVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 4b79361..ecbf475 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -152,7 +152,6 @@ import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS; import static org.apache.ignite.configuration.DeploymentMode.ISOLATED; import static org.apache.ignite.configuration.DeploymentMode.PRIVATE; import static org.apache.ignite.configuration.DeploymentMode.SHARED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC; import static org.apache.ignite.internal.IgniteComponentType.JTA; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; @@ -191,11 +190,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Template configuration add futures. */ private ConcurrentMap pendingTemplateFuts = new ConcurrentHashMap<>(); - /** Dynamic caches. */ - private ConcurrentMap registeredCaches = new ConcurrentHashMap<>(); - - /** Cache templates. */ - private ConcurrentMap registeredTemplates = new ConcurrentHashMap<>(); + /** */ + private ClusterCachesInfo cachesInfo; /** */ private IdentityHashMap sesHolders = new IdentityHashMap<>(); @@ -207,9 +203,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); /** */ - private Map cachesOnDisconnect; - - /** */ private Map clientReconnectReqs; /** Internal cache names. */ @@ -389,16 +382,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cc Cache Configuration. * @return {@code true} if cache is starting on client node and this node is affinity node for the cache. */ - private boolean storesLocallyOnClient(IgniteConfiguration c, - CacheConfiguration cc) { + private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) { if (c.isClientMode() && c.getMemoryConfiguration() == null) { if (cc.getCacheMode() == LOCAL) return true; - if (ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) - return true; + return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()); - return false; } else return false; @@ -623,6 +613,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) @Override public void start(boolean activeOnStart) throws IgniteCheckedException { + cachesInfo = new ClusterCachesInfo(ctx); + DeploymentMode depMode = ctx.config().getDeploymentMode(); if (!F.isEmpty(ctx.config().getCacheConfiguration())) { @@ -643,72 +635,31 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager mgr : sharedCtx.managers()) mgr.start(sharedCtx); - //if inActivate on start then skip registrate caches - if (!activeOnStart) - return; + if (activeOnStart && !ctx.config().isDaemon()) { + Map caches = new HashMap<>(); - CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); + Map templates = new HashMap<>(); - registerCacheFromConfig(cfgs); + registerCacheFromConfig(caches, templates); - registerCacheFromPersistentStore(cfgs); + registerCacheFromPersistentStore(caches, templates); - if (log.isDebugEnabled()) - log.debug("Started cache processor."); - } - - /** - * @param cfgs Cache configurations. - * @throws IgniteCheckedException If failed. - */ - private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException { - for (int i = 0; i < cfgs.length; i++) { - if (ctx.config().isDaemon()) - continue; - - CacheConfiguration cfg = new CacheConfiguration(cfgs[i]); - - cfgs[i] = cfg; // Replace original configuration value. - - registerCache(cfg); + cachesInfo.onStart(new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(), caches, templates)); } - } - - /** - * @param cfgs Cache configurations. - * @throws IgniteCheckedException If failed. - */ - private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException { - if (sharedCtx.pageStore() != null && - sharedCtx.database().persistenceEnabled() && - !ctx.config().isDaemon()) { - - Set savedCacheNames = sharedCtx.pageStore().savedCacheNames(); - - for (CacheConfiguration cfg : cfgs) - savedCacheNames.remove(cfg.getName()); - - for (String name : internalCaches) - savedCacheNames.remove(name); - - if (!F.isEmpty(savedCacheNames)) { - log.info("Registrate persistent caches: " + savedCacheNames); - - for (String name : savedCacheNames) { - CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); - if (cfg != null) - registerCache(cfg); - } - } - } + if (log.isDebugEnabled()) + log.debug("Started cache processor."); } /** * @param cfg Cache configuration. + * @param caches Caches map. + * @param templates Templates map. * @throws IgniteCheckedException If failed. */ - private void registerCache(CacheConfiguration cfg) throws IgniteCheckedException { + private void registerCache(CacheConfiguration cfg, + Map caches, + Map templates) throws IgniteCheckedException { cloneCheckSerializable(cfg); CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); @@ -716,20 +667,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Initialize defaults. initialize(cfg, cacheObjCtx); - String masked = maskNull(cfg.getName()); - - if (cacheDescriptor(cfg.getName()) != null) { - String cacheName = cfg.getName(); + boolean template = cfg.getName() != null && cfg.getName().endsWith("*"); - if (cacheName != null) + if (!template) { + if (caches.containsKey(cfg.getName())) { throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + - "assign unique name to each cache): " + U.maskName(cacheName)); - else - throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + - "assign unique name to each cache)."); - } + "assign unique name to each cache): " + cfg.getName()); + } - CacheType cacheType; + CacheType cacheType; if (CU.isUtilityCache(cfg.getName())) cacheType = CacheType.UTILITY; @@ -738,63 +684,163 @@ public class GridCacheProcessor extends GridProcessorAdapter { else cacheType = CacheType.USER; - if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null) - cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName()); - - boolean template = cfg.getName() != null && cfg.getName().endsWith("*"); - - DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, - cfg, - cacheType, - template, - IgniteUuid.randomUuid(), - new QuerySchema(cfg.getQueryEntities())); - - desc.locallyConfigured(true); - desc.staticallyConfigured(true); - desc.receivedFrom(ctx.localNodeId()); - - if (!template) { - cacheDescriptor(cfg.getName(), desc); - - ctx.discovery().setCacheFilter( - cfg.getName(), - cfg.getNodeFilter(), - cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, - cfg.getCacheMode()); - - ctx.discovery().addClientNode(cfg.getName(), - ctx.localNodeId(), - cfg.getNearConfiguration() != null); + if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null) + cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName()); if (!cacheType.userCache()) stopSeq.addLast(cfg.getName()); else stopSeq.addFirst(cfg.getName()); + + caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType, (byte)0)); } - else { - if (log.isDebugEnabled()) - log.debug("Use cache configuration as template: " + cfg); + else + templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER, (byte)0)); + } + + /** + * @param caches Caches map. + * @param templates Templates map. + * @throws IgniteCheckedException If failed. + */ + private void registerCacheFromConfig( + Map caches, + Map templates + ) throws IgniteCheckedException { + assert !ctx.config().isDaemon(); + + CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); + + for (int i = 0; i < cfgs.length; i++) { + CacheConfiguration cfg = new CacheConfiguration(cfgs[i]); - registeredTemplates.put(masked, desc); + registerCache(cfg, caches, templates); } + } - if (cfg.getName() == null) { // Use cache configuration with null name as template. - DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, - cfg, - cacheType, - true, - IgniteUuid.randomUuid(), - new QuerySchema(cfg.getQueryEntities())); + /** + * @param caches Caches map. + * @param templates Templates map. + * @throws IgniteCheckedException If failed. + */ + private void registerCacheFromPersistentStore( + Map caches, + Map templates + ) throws IgniteCheckedException { + assert !ctx.config().isDaemon(); - desc0.locallyConfigured(true); - desc0.staticallyConfigured(true); + if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) { + Set savedCacheNames = sharedCtx.pageStore().savedCacheNames(); - registeredTemplates.put(masked, desc0); + savedCacheNames.removeAll(caches.keySet()); + + savedCacheNames.removeAll(internalCaches); + + if (!F.isEmpty(savedCacheNames)) { + if (log.isInfoEnabled()) + log.info("Register persistent caches: " + savedCacheNames); + + for (String name : savedCacheNames) { + CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); + + if (cfg != null) + registerCache(cfg, caches, templates); + } + } } } /** + * @param cfg Cache configuration. + * @throws IgniteCheckedException If failed. + */ + private void registerCache(CacheConfiguration cfg) throws IgniteCheckedException { +// cloneCheckSerializable(cfg); +// +// CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); +// +// // Initialize defaults. +// initialize(cfg, cacheObjCtx); +// +// String masked = maskNull(cfg.getName()); +// +// if (cacheDescriptor(cfg.getName()) != null) { +// String cacheName = cfg.getName(); +// +// if (cacheName != null) +// throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + +// "assign unique name to each cache): " + U.maskName(cacheName)); +// else +// throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + +// "assign unique name to each cache)."); +// } +// +// CacheType cacheType; +// +// if (CU.isUtilityCache(cfg.getName())) +// cacheType = CacheType.UTILITY; +// else if (internalCaches.contains(maskNull(cfg.getName()))) +// cacheType = CacheType.INTERNAL; +// else +// cacheType = CacheType.USER; +// +// if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null) +// cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName()); +// +// boolean template = cfg.getName() != null && cfg.getName().endsWith("*"); +// +// DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, +// cfg, +// cacheType, +// template, +// IgniteUuid.randomUuid(), +// new QuerySchema(cfg.getQueryEntities())); +// +// desc.locallyConfigured(true); +// desc.staticallyConfigured(true); +// desc.receivedFrom(ctx.localNodeId()); +// +// if (!template) { +// cacheDescriptor(cfg.getName(), desc); +// +// ctx.discovery().setCacheFilter( +// cfg.getName(), +// cfg.getNodeFilter(), +// cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, +// cfg.getCacheMode()); +// +// ctx.discovery().addClientNode(cfg.getName(), +// ctx.localNodeId(), +// cfg.getNearConfiguration() != null); +// +// if (!cacheType.userCache()) +// stopSeq.addLast(cfg.getName()); +// else +// stopSeq.addFirst(cfg.getName()); +// } +// else { +// if (log.isDebugEnabled()) +// log.debug("Use cache configuration as template: " + cfg); +// +// registeredTemplates.put(masked, desc); +// } +// +// if (cfg.getName() == null) { // Use cache configuration with null name as template. +// DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, +// cfg, +// cacheType, +// true, +// IgniteUuid.randomUuid(), +// new QuerySchema(cfg.getQueryEntities())); +// +// desc0.locallyConfigured(true); +// desc0.staticallyConfigured(true); +// +// registeredTemplates.put(masked, desc0); +// } + } + + /** * Initialize internal cache names */ private void initializeInternalCacheNames() { @@ -864,54 +910,55 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.query().onCacheKernalStart(); // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - if (ctx.config().isDaemon()) - continue; - - desc.clearRemoteConfigurations(); - - CacheConfiguration ccfg = desc.cacheConfiguration(); - - IgnitePredicate filter = ccfg.getNodeFilter(); - - boolean loc = desc.locallyConfigured(); - - if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { - boolean started = desc.onStart(); - - assert started : "Failed to change started flag for locally configured cache: " + desc; - - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - - CachePluginManager pluginMgr = desc.pluginManager(); - - GridCacheContext ctx = createCache( - ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed()); - - ctx.dynamicDeploymentId(desc.deploymentId()); - - sharedCtx.addCacheContext(ctx); - - GridCacheAdapter cache = ctx.cache(); - - String name = ccfg.getName(); - - caches.put(maskNull(name), cache); - - startCache(cache, desc.schema()); - - jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); - } - } +// for (DynamicCacheDescriptor desc : cacheDescriptors()) { +// if (ctx.config().isDaemon()) +// continue; +// +// desc.clearRemoteConfigurations(); +// +// CacheConfiguration ccfg = desc.cacheConfiguration(); +// +// IgnitePredicate filter = ccfg.getNodeFilter(); +// +// boolean loc = desc.locallyConfigured(); +// +// if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { +// boolean started = desc.onStart(); +// +// assert started : "Failed to change started flag for locally configured cache: " + desc; +// +// CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); +// +// CachePluginManager pluginMgr = desc.pluginManager(); +// +// GridCacheContext ctx = createCache( +// ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed()); +// +// ctx.dynamicDeploymentId(desc.deploymentId()); +// +// sharedCtx.addCacheContext(ctx); +// +// GridCacheAdapter cache = ctx.cache(); +// +// String name = ccfg.getName(); +// +// caches.put(maskNull(name), cache); +// +// startCache(cache, desc.schema()); +// +// jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); +// } +// } } finally { cacheStartedLatch.countDown(); } // Must call onKernalStart on shared managers after creation of fetched caches. - for (GridCacheSharedManager mgr : sharedCtx.managers()) + for (GridCacheSharedManager mgr : sharedCtx.managers()) { if (sharedCtx.database() != mgr) mgr.onKernalStart(false); + } // Escape if start active on start false if (!activeOnStart) @@ -925,23 +972,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.service().onUtilityCacheStarted(); - // Wait for caches in SYNC preload mode. - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - CacheConfiguration cfg = desc.cacheConfiguration(); + AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(locNode.order(), 0); - IgnitePredicate filter = cfg.getNodeFilter(); - - if (desc.locallyConfigured() || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { - GridCacheAdapter cache = caches.get(maskNull(cfg.getName())); + for (GridCacheAdapter cache : caches.values()) { + CacheConfiguration cfg = cache.configuration(); - if (cache != null) { - if (cfg.getRebalanceMode() == SYNC) { - CacheMode cacheMode = cfg.getCacheMode(); + if (cache.context().affinityNode() && + cfg.getRebalanceMode() == SYNC && + startTopVer.equals(cache.context().startTopologyVersion())) { + CacheMode cacheMode = cfg.getCacheMode(); - if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0)) - cache.preloader().syncFuture().get(); - } - } + if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0)) + cache.preloader().syncFuture().get(); } } @@ -1031,7 +1073,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { stopCache(cache, cancel, false); } - registeredCaches.clear(); + cachesInfo.clearCaches(); } /** @@ -1102,8 +1144,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { - cachesOnDisconnect = new HashMap<>(registeredCaches); - IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( ctx.cluster().clientReconnectFuture(), "Failed to execute dynamic cache change request, client node disconnected."); @@ -1130,9 +1170,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.onDisconnected(reconnectFut); - registeredCaches.clear(); - - registeredTemplates.clear(); + cachesInfo.onDisconnect(); } /** {@inheritDoc} */ @@ -1141,24 +1179,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCompoundFuture stopFut = null; - for (final GridCacheAdapter cache : caches.values()) { - String name = cache.name(); - - boolean stopped; + Set stoppedCaches = cachesInfo.onReconnected(); - boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name); - - if (!sysCache) { - DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name)); - - assert oldDesc != null : "No descriptor for cache: " + name; - - DynamicCacheDescriptor newDesc = cacheDescriptor(name); - - stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId()); - } - else - stopped = false; + for (final GridCacheAdapter cache : caches.values()) { + boolean stopped = stoppedCaches.contains(cache.name()); if (stopped) { cache.context().gate().reconnected(true); @@ -1185,11 +1209,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { reconnected.add(cache); - if (!sysCache) { + if (cache.context().userCache()) { // Re-create cache structures inside indexing in order to apply recent schema changes. GridCacheContext cctx = cache.context(); - DynamicCacheDescriptor desc = cacheDescriptor(name); + DynamicCacheDescriptor desc = cacheDescriptor(cctx.name()); assert desc != null; @@ -1211,8 +1235,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter cache : reconnected) cache.context().gate().reconnected(false); - cachesOnDisconnect = null; - if (stopFut != null) stopFut.markInitialized(); @@ -1735,17 +1757,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Collection of started cache names. */ public Collection cacheNames() { - return F.viewReadOnly(cacheDescriptors(), - new IgniteClosure() { - @Override public String apply(DynamicCacheDescriptor desc) { - return desc.cacheConfiguration().getName(); - } - }, - new IgnitePredicate() { - @Override public boolean apply(DynamicCacheDescriptor desc) { - return desc.started(); - } - }); + return F.viewReadOnly(cacheDescriptors(), new IgniteClosure() { + @Override public String apply(DynamicCacheDescriptor desc) { + return desc.cacheConfiguration().getName(); + } + }); } /** @@ -1768,7 +1784,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } if (start) { - for (Map.Entry e : registeredCaches.entrySet()) { + for (Map.Entry e : cachesInfo.registeredCaches().entrySet()) { DynamicCacheDescriptor desc = e.getValue(); CacheConfiguration ccfg = desc.cacheConfiguration(); @@ -1828,9 +1844,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); - if (desc != null) - desc.onStart(); - prepareCacheStart( req.startCacheConfiguration(), req.nearCacheConfiguration(), @@ -1838,48 +1851,55 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.clientStartOnly(), req.initiatingNodeId(), req.deploymentId(), + desc.startTopologyVersion(), topVer, desc != null ? desc.schema() : null ); } + public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { + List caches = cachesInfo.cachesToStartOnLocalJoin(); + + for (DynamicCacheDescriptor desc : caches) { + prepareCacheStart( + desc.cacheConfiguration(), + null, + desc.cacheType(), + false, + null, + desc.deploymentId(), + desc.startTopologyVersion(), + exchTopVer, + desc.schema() + ); + } + } + /** * Starts statically configured caches received from remote nodes during exchange. * - * @param topVer Topology version. + * @param nodeId Joining node ID. + * @param exchTopVer Current exchange version. * @return Started caches descriptors. * @throws IgniteCheckedException If failed. */ - public Collection startReceivedCaches(AffinityTopologyVersion topVer) + public Collection startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { - List started = null; + List started = cachesInfo.cachesReceivedFromJoin(nodeId); - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) { - if (desc.receivedFrom() != null) { - AffinityTopologyVersion startVer = desc.receivedFromStartVersion(); - - if (startVer == null || startVer.compareTo(topVer) > 0) - continue; - } - - if (desc.onStart()) { - if (started == null) - started = new ArrayList<>(); - - started.add(desc); - - prepareCacheStart( - desc.cacheConfiguration(), - null, - desc.cacheType(), - false, - null, - desc.deploymentId(), - topVer, - desc.schema() - ); - } + if (started != null) { + for (DynamicCacheDescriptor desc : started) { + prepareCacheStart( + desc.cacheConfiguration(), + null, + desc.cacheType(), + false, + null, + desc.deploymentId(), + desc.startTopologyVersion(), + exchTopVer, + desc.schema() + ); } } @@ -1893,7 +1913,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param clientStartOnly Client only start request. * @param initiatingNodeId Initiating node ID. * @param deploymentId Deployment ID. - * @param topVer Topology version. + * @param cacheStartTopVer Cache start topology version. + * @param exchTopVer Current exchange version. * @param schema Query schema. * @throws IgniteCheckedException If failed. */ @@ -1904,7 +1925,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean clientStartOnly, UUID initiatingNodeId, IgniteUuid deploymentId, - AffinityTopologyVersion topVer, + AffinityTopologyVersion cacheStartTopVer, + AffinityTopologyVersion exchTopVer, @Nullable QuerySchema schema ) throws IgniteCheckedException { CacheConfiguration ccfg = new CacheConfiguration(cfg); @@ -1916,8 +1938,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter); boolean clientNodeStart = locNode.id().equals(initiatingNodeId); - if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null) - return; + assert !caches.containsKey(ccfg.getName()) : ccfg.getName(); if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) { if (clientNodeStart && !affNodeStart) { @@ -1931,7 +1952,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true); - cacheCtx.startTopologyVersion(topVer); + cacheCtx.startTopologyVersion(exchTopVer); + cacheCtx.cacheStartTopologyVersion(cacheStartTopVer); cacheCtx.dynamicDeploymentId(deploymentId); @@ -1950,7 +1972,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param req Stop request. */ - public void blockGateway(DynamicCacheChangeRequest req) { + void blockGateway(DynamicCacheChangeRequest req) { assert req.stop() || req.close(); if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) { @@ -2127,263 +2149,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId())); + dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), cachesInfo.joinDiscoveryData()); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - dataBag.addNodeSpecificData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId())); - } - - /** - * @param joiningNodeId Joining node id. - */ - private Serializable getDiscoveryData(UUID joiningNodeId) { - boolean reconnect = ctx.localNodeId().equals(joiningNodeId) && cachesOnDisconnect != null; - - // Collect dynamically started caches to a single object. - Collection reqs; - - Map> clientNodesMap; - - if (reconnect) { - reqs = new ArrayList<>(caches.size() + 1); - - clientNodesMap = U.newHashMap(caches.size()); - - collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId); - } - else { - reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1); - - clientNodesMap = ctx.discovery().clientNodesMap(); - - collectDataOnGridNode(reqs); - } - - DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs); - - batch.clientNodes(clientNodesMap); - - batch.clientReconnect(reconnect); - - // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same. - batch.id(null); - - return batch; - } - - /** - * @param reqs requests. - */ - private void collectDataOnGridNode(Collection reqs) { - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - // RequestId must be null because on different node will be different byte [] and - // we get duplicate discovery data, for more details see - // TcpDiscoveryNodeAddedMessage#addDiscoveryData. - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(), - null); - - req.startCacheConfiguration(desc.cacheConfiguration()); - req.cacheType(desc.cacheType()); - req.deploymentId(desc.deploymentId()); - req.receivedFrom(desc.receivedFrom()); - req.schema(desc.schema()); - - reqs.add(req); - } - - for (DynamicCacheDescriptor desc : registeredTemplates.values()) { - // RequestId must be null because on different node will be different byte [] and - // we get duplicate discovery data, for more details see - // TcpDiscoveryNodeAddedMessage#addDiscoveryData. - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(), - null); - - req.startCacheConfiguration(desc.cacheConfiguration()); - req.schema(desc.schema()); - - req.template(true); - - reqs.add(req); - } - } - - /** - * @param reqs requests. - * @param clientNodesMap Client nodes map. - * @param nodeId Node id. - */ - private void collectDataOnReconnectingNode( - Collection reqs, - Map> clientNodesMap, - UUID nodeId - ) { - for (GridCacheAdapter cache : caches.values()) { - DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name())); - - if (desc == null) - continue; - - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null); - - req.startCacheConfiguration(desc.cacheConfiguration()); - req.cacheType(desc.cacheType()); - req.deploymentId(desc.deploymentId()); - req.receivedFrom(desc.receivedFrom()); - req.schema(desc.schema()); - - reqs.add(req); - - Boolean nearEnabled = cache.isNear(); - - Map map = U.newHashMap(1); - - map.put(nodeId, nearEnabled); - - clientNodesMap.put(cache.name(), map); - } + if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal())) + dataBag.addGridCommonData(CACHE_PROC.ordinal(), cachesInfo.collectCommonDiscoveryData()); } /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { - if (data.hasJoiningNodeData()) { - Serializable joiningNodeData = data.joiningNodeData(); - if (joiningNodeData instanceof DynamicCacheChangeBatch) - onDiscoDataReceived( - data.joiningNodeId(), - data.joiningNodeId(), - (DynamicCacheChangeBatch) joiningNodeData, true); - } + cachesInfo.onJoiningNodeDataReceived(data); } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecData = data.nodeSpecificData(); - - if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) { - if (e.getValue() != null && e.getValue() instanceof DynamicCacheChangeBatch) { - DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch) e.getValue(); - - onDiscoDataReceived(data.joiningNodeId(), e.getKey(), batch, false); - } - } - } - } - - /** - * @param joiningNodeId Joining node id. - * @param rmtNodeId Rmt node id. - * @param batch Batch. - * @param join Whether this is data from joining node. - */ - private void onDiscoDataReceived(UUID joiningNodeId, UUID rmtNodeId, DynamicCacheChangeBatch batch, boolean join) { - if (batch.clientReconnect()) { - if (ctx.clientDisconnected()) { - if (clientReconnectReqs == null) - clientReconnectReqs = new LinkedHashMap<>(); - - clientReconnectReqs.put(joiningNodeId, batch); - - return; - } - - processClientReconnectData(joiningNodeId, batch); - } - else { - for (DynamicCacheChangeRequest req : batch.requests()) { - initReceivedCacheConfiguration(req); - - if (req.template()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); - - assert ccfg != null : req; - - DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName())); - - if (existing == null) { - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - ccfg, - req.cacheType(), - true, - req.deploymentId(), - req.schema()); - - registeredTemplates.put(maskNull(req.cacheName()), desc); - } - - continue; - } - - DynamicCacheDescriptor existing = cacheDescriptor(req.cacheName()); - - if (req.start() && !req.clientStartOnly()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); - - if (existing != null) { - if (joiningNodeId.equals(ctx.localNodeId())) { - existing.receivedFrom(req.receivedFrom()); - existing.deploymentId(req.deploymentId()); - } - - if (existing.locallyConfigured()) { - existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration()); - - if (!join) - // Overwrite existing with remote. - existing.schema(req.schema()); - - ctx.discovery().setCacheFilter( - req.cacheName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode()); - } - } - else { - assert req.cacheType() != null : req; - - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - ccfg, - req.cacheType(), - false, - req.deploymentId(), - req.schema()); - - // Received statically configured cache. - if (req.initiatingNodeId() == null) - desc.staticallyConfigured(true); - - if (joiningNodeId.equals(ctx.localNodeId())) - desc.receivedOnDiscovery(true); - - desc.receivedFrom(req.receivedFrom()); - - DynamicCacheDescriptor old = cacheDescriptor(req.cacheName(), desc); - - assert old == null : old; - - ctx.discovery().setCacheFilter( - req.cacheName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode()); - } - } - } - - if (!F.isEmpty(batch.clientNodes())) { - for (Map.Entry> entry : batch.clientNodes().entrySet()) { - String cacheName = entry.getKey(); - - for (Map.Entry tup : entry.getValue().entrySet()) - ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); - } - } - } + cachesInfo.onGridDataReceived(data); } /** @@ -2469,7 +2251,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { List wildcardNameCfgs = null; - for (DynamicCacheDescriptor desc : registeredTemplates.values()) { + for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) { assert desc.template(); CacheConfiguration cfg = desc.cacheConfiguration(); @@ -2744,7 +2526,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { checkEmptyTransactions(); if (F.isEmpty(cacheNames)) - cacheNames = registeredCaches.keySet(); + cacheNames = cachesInfo.registeredCaches().keySet(); Collection reqs = new ArrayList<>(cacheNames.size()); @@ -2965,12 +2747,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param topVer Topology version. */ public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { - if (type == EVT_NODE_JOINED) { - for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) { - if (node.id().equals(cacheDesc.receivedFrom())) - cacheDesc.receivedFromStartVersion(topVer); - } - } + cachesInfo.onDiscoveryEvent(type, node, topVer); sharedCtx.affinity().onDiscoveryEvent(type, node, topVer); } @@ -2997,11 +2774,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { return true; if (msg instanceof DynamicCacheChangeBatch) - return onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); + return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); return false; } + void completeTemplateAddFuture(String name, IgniteUuid deploymentId) { + GridCacheProcessor.TemplateConfigurationFuture fut = + (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name); + + if (fut != null && fut.deploymentId().equals(deploymentId)) + fut.onDone(); + } + + void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) { + if (ctx.localNodeId().equals(req.initiatingNodeId())) { + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId()); + + if (fut != null && F.eq(req.deploymentId(), fut.deploymentId())) + fut.onDone(err); + } + } + /** * @param batch Change request batch. * @param topVer Current topology version. @@ -3023,13 +2817,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert ccfg != null : req; - DynamicCacheDescriptor desc = registeredTemplates.get(maskNull(req.cacheName())); + DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(maskNull(req.cacheName())); if (desc == null) { DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId(), req.schema()); - DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc); + DynamicCacheDescriptor old = cachesInfo.registeredTemplates().put(maskNull(ccfg.getName()), templateDesc); assert old == null : "Dynamic cache map was concurrently modified [new=" + templateDesc + ", old=" + old + ']'; @@ -3080,7 +2874,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { startDesc.startTopologyVersion(newTopVer); - DynamicCacheDescriptor old = cacheDescriptor(ccfg.getName(), startDesc); + // TODO + DynamicCacheDescriptor old = null;//cacheDescriptor(ccfg.getName(), startDesc); assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; @@ -3152,7 +2947,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (desc != null) { if (req.stop()) { - DynamicCacheDescriptor old = registeredCaches.remove(maskNull(req.cacheName())); + DynamicCacheDescriptor old = cachesInfo.registeredCaches().remove(maskNull(req.cacheName())); assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']'; @@ -3610,25 +3405,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Descriptor. */ public DynamicCacheDescriptor cacheDescriptor(String name) { - return registeredCaches.get(maskNull(name)); - } - - /** - * Put registered cache descriptor. - * - * @param name Name. - * @param desc Descriptor. - * @return Old descriptor (if any). - */ - private DynamicCacheDescriptor cacheDescriptor(String name, DynamicCacheDescriptor desc) { - return registeredCaches.put(maskNull(name), desc); + return cachesInfo.registeredCaches().get(name); } /** * @return Cache descriptors. */ public Collection cacheDescriptors() { - return registeredCaches.values(); + return cachesInfo.registeredCaches().values(); } /** @@ -3655,7 +3439,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException { String masked = maskNull(cacheCfg.getName()); - DynamicCacheDescriptor desc = registeredTemplates.get(masked); + DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(masked); if (desc != null) return; @@ -3833,7 +3617,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException In case of error. */ public void createMissingQueryCaches() throws IgniteCheckedException { - for (Map.Entry e : registeredCaches.entrySet()) { + for (Map.Entry e : cachesInfo.registeredCaches().entrySet()) { DynamicCacheDescriptor desc = e.getValue(); if (isMissingQueryCache(desc))