Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1DDBC200C88 for ; Thu, 18 May 2017 10:54:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1C51D160BC4; Thu, 18 May 2017 08:54:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B8511160BDF for ; Thu, 18 May 2017 10:54:20 +0200 (CEST) Received: (qmail 85706 invoked by uid 500); 18 May 2017 08:54:19 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 85384 invoked by uid 99); 18 May 2017 08:54:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 May 2017 08:54:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7E4A1E10F8; Thu, 18 May 2017 08:54:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 18 May 2017 08:54:31 -0000 Message-Id: In-Reply-To: <99f39dc174f24c1483da8d26c998a86f@git.apache.org> References: <99f39dc174f24c1483da8d26c998a86f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/17] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture. archived-at: Thu, 18 May 2017 08:54:23 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 87aaee0..06ad62d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -26,7 +25,6 @@ import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; @@ -61,13 +59,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.query.QuerySchema; -import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; -import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask; -import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask; -import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; -import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteInternalFuture; @@ -110,14 +101,23 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.plugin.CachePluginManager; +import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask; +import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask; +import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; +import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -153,7 +153,6 @@ import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS; import static org.apache.ignite.configuration.DeploymentMode.ISOLATED; import static org.apache.ignite.configuration.DeploymentMode.PRIVATE; import static org.apache.ignite.configuration.DeploymentMode.SHARED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC; import static org.apache.ignite.internal.IgniteComponentType.JTA; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; @@ -193,11 +192,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Template configuration add futures. */ private ConcurrentMap pendingTemplateFuts = new ConcurrentHashMap<>(); - /** Dynamic caches. */ - private ConcurrentMap registeredCaches = new ConcurrentHashMap<>(); - - /** Cache templates. */ - private ConcurrentMap registeredTemplates = new ConcurrentHashMap<>(); + /** */ + private ClusterCachesInfo cachesInfo; /** */ private IdentityHashMap sesHolders = new IdentityHashMap<>(); @@ -208,12 +204,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Count down latch for caches. */ private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); - /** */ - private Map cachesOnDisconnect; - - /** */ - private Map clientReconnectReqs; - /** Internal cache names. */ private final Set internalCaches; @@ -391,16 +381,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cc Cache Configuration. * @return {@code true} if cache is starting on client node and this node is affinity node for the cache. */ - private boolean storesLocallyOnClient(IgniteConfiguration c, - CacheConfiguration cc) { + private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) { if (c.isClientMode() && c.getMemoryConfiguration() == null) { if (cc.getCacheMode() == LOCAL) return true; - if (ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) - return true; + return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()); - return false; } else return false; @@ -627,6 +614,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) @Override public void start(boolean activeOnStart) throws IgniteCheckedException { + cachesInfo = new ClusterCachesInfo(ctx); + DeploymentMode depMode = ctx.config().getDeploymentMode(); if (!F.isEmpty(ctx.config().getCacheConfiguration())) { @@ -647,72 +636,42 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager mgr : sharedCtx.managers()) mgr.start(sharedCtx); - //if inActivate on start then skip registrate caches - if (!activeOnStart) - return; - - CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - - registerCacheFromConfig(cfgs); + if (activeOnStart && !ctx.config().isDaemon()) { + Map caches = new HashMap<>(); - registerCacheFromPersistentStore(cfgs); - - if (log.isDebugEnabled()) - log.debug("Started cache processor."); - } + Map templates = new HashMap<>(); - /** - * @param cfgs Cache configurations. - * @throws IgniteCheckedException If failed. - */ - private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException { - for (int i = 0; i < cfgs.length; i++) { - if (ctx.config().isDaemon()) - continue; + addCacheOnJoinFromConfig(caches, templates); - CacheConfiguration cfg = new CacheConfiguration(cfgs[i]); + addCacheOnJoinFromPersistentStore(caches, templates); - cfgs[i] = cfg; // Replace original configuration value. + CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(), + caches, + templates, + startAllCachesOnClientStart()); - registerCache(cfg); + cachesInfo.onStart(discoData); } - } - - /** - * @param cfgs Cache configurations. - * @throws IgniteCheckedException If failed. - */ - private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException { - if (sharedCtx.pageStore() != null && - sharedCtx.database().persistenceEnabled() && - !ctx.config().isDaemon()) { - - Set savedCacheNames = sharedCtx.pageStore().savedCacheNames(); - - for (CacheConfiguration cfg : cfgs) - savedCacheNames.remove(cfg.getName()); - - for (String name : internalCaches) - savedCacheNames.remove(name); - - if (!F.isEmpty(savedCacheNames)) { - log.info("Registrate persistent caches: " + savedCacheNames); - - for (String name : savedCacheNames) { - CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); - - if (cfg != null) - registerCache(cfg); - } - } + else { + cachesInfo.onStart(new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(), + Collections.emptyMap(), + Collections.emptyMap(), + false)); } + + if (log.isDebugEnabled()) + log.debug("Started cache processor."); } /** * @param cfg Cache configuration. + * @param caches Caches map. + * @param templates Templates map. * @throws IgniteCheckedException If failed. */ - private void registerCache(CacheConfiguration cfg) throws IgniteCheckedException { + private void addCacheOnJoin(CacheConfiguration cfg, + Map caches, + Map templates) throws IgniteCheckedException { CU.validateCacheName(cfg.getName()); cloneCheckSerializable(cfg); @@ -722,75 +681,88 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Initialize defaults. initialize(cfg, cacheObjCtx); - String cacheName = cfg.getName(); + boolean template = cfg.getName().endsWith("*"); - if (cacheDescriptor(cfg.getName()) != null) { - throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + - "assign unique name to each cache): " + cacheName); - } + if (!template) { + if (caches.containsKey(cfg.getName())) { + throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + + "assign unique name to each cache): " + cfg.getName()); + } - CacheType cacheType; + CacheType cacheType; - if (CU.isUtilityCache(cfg.getName())) + if (CU.isUtilityCache(cfg.getName())) cacheType = CacheType.UTILITY; else if (internalCaches.contains(cfg.getName())) cacheType = CacheType.INTERNAL; else cacheType = CacheType.USER; - if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null) - cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName()); + if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null) + cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName()); - boolean template = cfg.getName() != null && cfg.getName().endsWith("*"); + if (!cacheType.userCache()) + stopSeq.addLast(cfg.getName()); + else + stopSeq.addFirst(cfg.getName()); - DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, - cfg, - cacheType, - template, - IgniteUuid.randomUuid(), - new QuerySchema(cfg.getQueryEntities())); + caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType, (byte)0)); + } + else + templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER, (byte)0)); + } - desc.locallyConfigured(true); - desc.staticallyConfigured(true); - desc.receivedFrom(ctx.localNodeId()); + /** + * @param caches Caches map. + * @param templates Templates map. + * @throws IgniteCheckedException If failed. + */ + private void addCacheOnJoinFromConfig( + Map caches, + Map templates + ) throws IgniteCheckedException { + assert !ctx.config().isDaemon(); - if (!template) { - cacheDescriptor(cfg.getName(), desc); + CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - ctx.discovery().setCacheFilter( - cfg.getName(), - cfg.getNodeFilter(), - cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, - cfg.getCacheMode()); + for (int i = 0; i < cfgs.length; i++) { + CacheConfiguration cfg = new CacheConfiguration(cfgs[i]); - ctx.discovery().addClientNode(cfg.getName(), - ctx.localNodeId(), - cfg.getNearConfiguration() != null); + cfgs[i] = cfg; // Replace original configuration value. - if (!cacheType.userCache()) - stopSeq.addLast(cfg.getName()); - else - stopSeq.addFirst(cfg.getName()); + addCacheOnJoin(cfg, caches, templates); } - else { - if (log.isDebugEnabled()) - log.debug("Use cache configuration as template: " + cfg); + } - registeredTemplates.put(cacheName, desc); - } + /** + * @param caches Caches map. + * @param templates Templates map. + * @throws IgniteCheckedException If failed. + */ + private void addCacheOnJoinFromPersistentStore( + Map caches, + Map templates + ) throws IgniteCheckedException { + assert !ctx.config().isDaemon(); - if (cfg.getName() == null) { // Use cache configuration with null name as template. - DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, - cfg, - cacheType, - true, - IgniteUuid.randomUuid(), - new QuerySchema(cfg.getQueryEntities())); + if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) { + Set savedCacheNames = sharedCtx.pageStore().savedCacheNames(); + + savedCacheNames.removeAll(caches.keySet()); + + savedCacheNames.removeAll(internalCaches); - desc0.locallyConfigured(true); - desc0.staticallyConfigured(true); + if (!F.isEmpty(savedCacheNames)) { + if (log.isInfoEnabled()) + log.info("Register persistent caches: " + savedCacheNames); + + for (String name : savedCacheNames) { + CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); - registeredTemplates.put(cacheName, desc0); + if (cfg != null) + addCacheOnJoin(cfg, caches, templates); + } + } } } @@ -819,7 +791,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { ClusterNode locNode = ctx.discovery().localNode(); try { - checkConsistency(); + boolean checkConsistency = + !ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK); + + if (checkConsistency) + checkConsistency(); + + cachesInfo.onKernalStart(checkConsistency); boolean currStatus = ctx.state().active(); @@ -865,88 +843,47 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.query().onCacheKernalStart(); - // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - if (ctx.config().isDaemon()) - continue; - - desc.clearRemoteConfigurations(); - - CacheConfiguration ccfg = desc.cacheConfiguration(); - - IgnitePredicate filter = ccfg.getNodeFilter(); - - boolean loc = desc.locallyConfigured(); - - if (loc || (desc.receivedOnDiscovery() && - (startAllCachesOnClientStart() || CU.affinityNode(locNode, filter)))) { - boolean started = desc.onStart(); - - assert started : "Failed to change started flag for locally configured cache: " + desc; - - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - - CachePluginManager pluginMgr = desc.pluginManager(); - - GridCacheContext ctx = createCache( - ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed()); - - ctx.dynamicDeploymentId(desc.deploymentId()); - - sharedCtx.addCacheContext(ctx); - - GridCacheAdapter cache = ctx.cache(); - - String name = ccfg.getName(); - - caches.put(name, cache); - - startCache(cache, desc.schema()); - - jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false)); - } + for (GridCacheSharedManager mgr : sharedCtx.managers()) { + if (sharedCtx.database() != mgr) + mgr.onKernalStart(false); } } finally { cacheStartedLatch.countDown(); } - // Must call onKernalStart on shared managers after creation of fetched caches. - for (GridCacheSharedManager mgr : sharedCtx.managers()) - if (sharedCtx.database() != mgr) - mgr.onKernalStart(false); - // Escape if start active on start false if (!activeOnStart) return; - for (GridCacheAdapter cache : caches.values()) - onKernalStart(cache); - if (!ctx.config().isDaemon()) ctx.cacheObjects().onUtilityCacheStarted(); ctx.service().onUtilityCacheStarted(); - // Wait for caches in SYNC preload mode. - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - CacheConfiguration cfg = desc.cacheConfiguration(); + final AffinityTopologyVersion startTopVer = + new AffinityTopologyVersion(ctx.discovery().localJoinEvent().topologyVersion(), 0); - IgnitePredicate filter = cfg.getNodeFilter(); + final List syncFuts = new ArrayList<>(caches.size()); - if (desc.locallyConfigured() || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { - GridCacheAdapter cache = caches.get(cfg.getName()); + sharedCtx.forAllCaches(new CIX1() { + @Override public void applyx(GridCacheContext cctx) throws IgniteCheckedException { + CacheConfiguration cfg = cctx.config(); - if (cache != null) { - if (cfg.getRebalanceMode() == SYNC) { - CacheMode cacheMode = cfg.getCacheMode(); + if (cctx.affinityNode() && + cfg.getRebalanceMode() == SYNC && + startTopVer.equals(cctx.startTopologyVersion())) { + CacheMode cacheMode = cfg.getCacheMode(); - if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0)) - cache.preloader().syncFuture().get(); - } + if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0)) + // Need to wait outside to avoid a deadlock + syncFuts.add(cctx.preloader().syncFuture()); } } - } + }); + + for (int i = 0, size = syncFuts.size(); i < size; i++) + syncFuts.get(i).get(); assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; @@ -962,38 +899,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * + * @throws IgniteCheckedException if check failed. */ private void checkConsistency() throws IgniteCheckedException { - if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) { - if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)) - continue; - - checkTransactionConfiguration(n); - - DeploymentMode locDepMode = ctx.config().getDeploymentMode(); - DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - - CU.checkAttributeMismatch( - log, null, n.id(), "deploymentMode", "Deployment mode", - locDepMode, rmtDepMode, true); - - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); - - if (rmtCfg != null) { - CacheConfiguration locCfg = desc.cacheConfiguration(); + for (ClusterNode n : ctx.discovery().remoteNodes()) { + if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)) + continue; - checkCache(locCfg, rmtCfg, n); + checkTransactionConfiguration(n); - // Check plugin cache configurations. - CachePluginManager pluginMgr = desc.pluginManager(); + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - pluginMgr.validateRemotes(rmtCfg, n); - } - } - } + CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); } } @@ -1034,7 +953,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { stopCache(cache, cancel, false); } - registeredCaches.clear(); + cachesInfo.clearCaches(); } /** @@ -1105,8 +1024,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { - cachesOnDisconnect = new HashMap<>(registeredCaches); - IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( ctx.cluster().clientReconnectFuture(), "Failed to execute dynamic cache change request, client node disconnected."); @@ -1133,9 +1050,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.onDisconnected(reconnectFut); - registeredCaches.clear(); - - registeredTemplates.clear(); + cachesInfo.onDisconnect(); } /** {@inheritDoc} */ @@ -1144,24 +1059,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCompoundFuture stopFut = null; - for (final GridCacheAdapter cache : caches.values()) { - String name = cache.name(); - - boolean stopped; - - boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name); + Set stoppedCaches = cachesInfo.onReconnected(); - if (!sysCache) { - DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(name); - - assert oldDesc != null : "No descriptor for cache: " + name; - - DynamicCacheDescriptor newDesc = cacheDescriptor(name); - - stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId()); - } - else - stopped = false; + for (final GridCacheAdapter cache : caches.values()) { + boolean stopped = stoppedCaches.contains(cache.name()); if (stopped) { cache.context().gate().reconnected(true); @@ -1188,11 +1089,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { reconnected.add(cache); - if (!sysCache) { + if (cache.context().userCache()) { // Re-create cache structures inside indexing in order to apply recent schema changes. GridCacheContext cctx = cache.context(); - DynamicCacheDescriptor desc = cacheDescriptor(name); + DynamicCacheDescriptor desc = cacheDescriptor(cctx.name()); assert desc != null; @@ -1202,20 +1103,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - if (clientReconnectReqs != null) { - for (Map.Entry e : clientReconnectReqs.entrySet()) - processClientReconnectData(e.getKey(), e.getValue()); - - clientReconnectReqs = null; - } - sharedCtx.onReconnected(); for (GridCacheAdapter cache : reconnected) cache.context().gate().reconnected(false); - cachesOnDisconnect = null; - if (stopFut != null) stopFut.markInitialized(); @@ -1438,16 +1330,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param cfg Cache configuration to use to create cache. * @param pluginMgr Cache plugin manager. - * @param cacheType Cache type. + * @param desc Cache descriptor. + * @param locStartTopVer Current topology version. * @param cacheObjCtx Cache object context. + * @param affNode {@code True} if local node affinity node. * @param updatesAllowed Updates allowed flag. * @return Cache context. * @throws IgniteCheckedException If failed to create cache. */ private GridCacheContext createCache(CacheConfiguration cfg, @Nullable CachePluginManager pluginMgr, - CacheType cacheType, + DynamicCacheDescriptor desc, + AffinityTopologyVersion locStartTopVer, CacheObjectContext cacheObjCtx, + boolean affNode, boolean updatesAllowed) throws IgniteCheckedException { assert cfg != null; @@ -1465,7 +1361,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { QueryUtils.prepareCacheConfiguration(cfg); - validate(ctx.config(), cfg, cacheType, cfgStore); + validate(ctx.config(), cfg, desc.cacheType(), cfgStore); if (pluginMgr == null) pluginMgr = new CachePluginManager(ctx, cfg); @@ -1475,7 +1371,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.jta().registerCache(cfg); // Skip suggestions for internal caches. - if (cacheType.userCache()) + if (desc.cacheType().userCache()) suggestOptimizations(cfg, cfgStore != null); Collection toPrepare = new ArrayList<>(); @@ -1508,8 +1404,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { storeMgr.initialize(cfgStore, sesHolders); - boolean affNode = cfg.getCacheMode() == LOCAL || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()); - String memPlcName = cfg.getMemoryPolicyName(); MemoryPolicy memPlc = sharedCtx.database().memoryPolicy(memPlcName); @@ -1520,7 +1414,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx, sharedCtx, cfg, - cacheType, + desc.cacheType(), + locStartTopVer, + desc.receivedFrom(), affNode, updatesAllowed, memPlc, @@ -1651,7 +1547,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx, sharedCtx, cfg, - cacheType, + desc.cacheType(), + locStartTopVer, + desc.receivedFrom(), affNode, true, memPlc, @@ -1738,17 +1636,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Collection of started cache names. */ public Collection cacheNames() { - return F.viewReadOnly(cacheDescriptors(), - new IgniteClosure() { - @Override public String apply(DynamicCacheDescriptor desc) { - return desc.cacheConfiguration().getName(); - } - }, - new IgnitePredicate() { - @Override public boolean apply(DynamicCacheDescriptor desc) { - return desc.started(); - } - }); + return F.viewReadOnly(cacheDescriptors(), new IgniteClosure() { + @Override public String apply(DynamicCacheDescriptor desc) { + return desc.cacheConfiguration().getName(); + } + }); } /** @@ -1771,7 +1663,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } if (start) { - for (Map.Entry e : registeredCaches.entrySet()) { + for (Map.Entry e : cachesInfo.registeredCaches().entrySet()) { DynamicCacheDescriptor desc = e.getValue(); CacheConfiguration ccfg = desc.cacheConfiguration(); @@ -1822,140 +1714,138 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param req Cache start request. - * @param topVer Topology version. + * @param cacheDesc Cache start request. + * @param nearCfg Near cache configuration. + * @param exchTopVer Current exchange version. * @throws IgniteCheckedException If failed. */ - public void prepareCacheStart(DynamicCacheChangeRequest req, AffinityTopologyVersion topVer) + void prepareCacheStart(DynamicCacheDescriptor cacheDesc, + @Nullable NearCacheConfiguration nearCfg, + AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { - assert req.start() : req; - assert req.cacheType() != null : req; + prepareCacheStart( + cacheDesc.cacheConfiguration(), + nearCfg, + cacheDesc, + exchTopVer, + cacheDesc.schema() + ); + } - DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); + /** + * @param exchTopVer Current exchange version. + * @throws IgniteCheckedException If failed. + */ + public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { + List> caches = cachesInfo.cachesToStartOnLocalJoin(); - if (desc != null) - desc.onStart(); + if (!F.isEmpty(caches)) { + for (T2 t : caches) { + DynamicCacheDescriptor desc = t.get1(); - prepareCacheStart( - req.startCacheConfiguration(), - req.nearCacheConfiguration(), - req.cacheType(), - req.clientStartOnly(), - req.initiatingNodeId(), - req.deploymentId(), - topVer, - desc != null ? desc.schema() : null - ); + prepareCacheStart( + desc.cacheConfiguration(), + t.get2(), + desc, + exchTopVer, + desc.schema() + ); + } + } } /** * Starts statically configured caches received from remote nodes during exchange. * - * @param topVer Topology version. + * @param nodeId Joining node ID. + * @param exchTopVer Current exchange version. * @return Started caches descriptors. * @throws IgniteCheckedException If failed. */ - public Collection startReceivedCaches(AffinityTopologyVersion topVer) + public Collection startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { - List started = null; - - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) { - if (desc.receivedFrom() != null) { - AffinityTopologyVersion startVer = desc.receivedFromStartVersion(); + List started = cachesInfo.cachesReceivedFromJoin(nodeId); - if (startVer == null || startVer.compareTo(topVer) > 0) - continue; - } - - if (desc.onStart()) { - if (started == null) - started = new ArrayList<>(); - - started.add(desc); + if (started != null) { + for (DynamicCacheDescriptor desc : started) { + IgnitePredicate filter = desc.cacheConfiguration().getNodeFilter(); + if (CU.affinityNode(ctx.discovery().localNode(), filter)) { prepareCacheStart( desc.cacheConfiguration(), null, - desc.cacheType(), - false, - null, - desc.deploymentId(), - topVer, + desc, + exchTopVer, desc.schema() ); } } } - return started; + return started != null ? started : Collections.emptyList(); } /** - * @param cfg Start configuration. - * @param nearCfg Near configuration. - * @param cacheType Cache type. - * @param clientStartOnly Client only start request. - * @param initiatingNodeId Initiating node ID. - * @param deploymentId Deployment ID. - * @param topVer Topology version. + * @param startCfg Start configuration. + * @param reqNearCfg Near configuration if specified for client cache start request. + * @param desc Cache descriptor. + * @param exchTopVer Current exchange version. * @param schema Query schema. * @throws IgniteCheckedException If failed. */ private void prepareCacheStart( - CacheConfiguration cfg, - NearCacheConfiguration nearCfg, - CacheType cacheType, - boolean clientStartOnly, - UUID initiatingNodeId, - IgniteUuid deploymentId, - AffinityTopologyVersion topVer, + CacheConfiguration startCfg, + @Nullable NearCacheConfiguration reqNearCfg, + DynamicCacheDescriptor desc, + AffinityTopologyVersion exchTopVer, @Nullable QuerySchema schema ) throws IgniteCheckedException { - CacheConfiguration ccfg = new CacheConfiguration(cfg); - - IgnitePredicate nodeFilter = ccfg.getNodeFilter(); + assert !caches.containsKey(startCfg.getName()) : startCfg.getName(); - ClusterNode locNode = ctx.discovery().localNode(); + CacheConfiguration ccfg = new CacheConfiguration(startCfg); - boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter); - boolean clientNodeStart = locNode.id().equals(initiatingNodeId); + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null) - return; + boolean affNode; - if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) { - if (clientNodeStart && !affNodeStart) { - if (nearCfg != null) - ccfg.setNearConfiguration(nearCfg); - else - ccfg.setNearConfiguration(null); - } + if (ccfg.getCacheMode() == LOCAL) { + affNode = true; - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); + ccfg.setNearConfiguration(null); + } + else if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter())) + affNode = true; + else { + affNode = false; - GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true); + ccfg.setNearConfiguration(reqNearCfg); + } - cacheCtx.startTopologyVersion(topVer); + GridCacheContext cacheCtx = createCache(ccfg, + null, + desc, + exchTopVer, + cacheObjCtx, + affNode, + true); - cacheCtx.dynamicDeploymentId(deploymentId); + cacheCtx.dynamicDeploymentId(desc.deploymentId()); - GridCacheAdapter cache = cacheCtx.cache(); + GridCacheAdapter cache = cacheCtx.cache(); - sharedCtx.addCacheContext(cacheCtx); + sharedCtx.addCacheContext(cacheCtx); - caches.put(cacheCtx.name(), cache); + caches.put(cacheCtx.name(), cache); - startCache(cache, schema != null ? schema : new QuerySchema()); + startCache(cache, schema != null ? schema : new QuerySchema()); - onKernalStart(cache); - } + onKernalStart(cache); } /** * @param req Stop request. */ - public void blockGateway(DynamicCacheChangeRequest req) { + void blockGateway(DynamicCacheChangeRequest req) { assert req.stop() || req.close(); if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) { @@ -1997,9 +1887,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.removeCacheContext(ctx); - assert req.deploymentId().equals(ctx.dynamicDeploymentId()) : "Different deployment IDs [req=" + req + - ", ctxDepId=" + ctx.dynamicDeploymentId() + ']'; - onKernalStop(cache, req.destroy()); stopCache(cache, true, req.destroy()); @@ -2010,52 +1897,52 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Callback invoked when first exchange future for dynamic cache is completed. * * @param topVer Completed topology version. - * @param reqs Change requests. + * @param exchActions Change requests. * @param err Error. */ @SuppressWarnings("unchecked") public void onExchangeDone( AffinityTopologyVersion topVer, - Collection reqs, + @Nullable ExchangeActions exchActions, Throwable err ) { for (GridCacheAdapter cache : caches.values()) { GridCacheContext cacheCtx = cache.context(); - if (F.eq(cacheCtx.startTopologyVersion(), topVer)) { + if (cacheCtx.startTopologyVersion().equals(topVer)) { + jCacheProxies.put(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + if (cacheCtx.preloader() != null) cacheCtx.preloader().onInitialExchangeComplete(err); + } + } - String masked = cacheCtx.name(); + if (exchActions != null && err == null) { + for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) { + stopGateway(action.request()); - jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); + prepareCacheStop(action.request()); } - } - if (!F.isEmpty(reqs) && err == null) { - for (DynamicCacheChangeRequest req : reqs) { - String masked = req.cacheName(); + for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) { + String cacheName = req.cacheName(); - if (req.stop()) { - stopGateway(req); + IgniteCacheProxy proxy = jCacheProxies.get(cacheName); - prepareCacheStop(req); - } - else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) { - IgniteCacheProxy proxy = jCacheProxies.remove(masked); + if (proxy != null) { + if (proxy.context().affinityNode()) { + GridCacheAdapter cache = caches.get(cacheName); - if (proxy != null) { - if (proxy.context().affinityNode()) { - GridCacheAdapter cache = caches.get(masked); + assert cache != null : cacheName; - if (cache != null) - jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); - } - else { - proxy.context().gate().onStopped(); + jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false)); + } + else { + jCacheProxies.remove(cacheName); - prepareCacheStop(req); - } + proxy.context().gate().onStopped(); + + prepareCacheStop(req); } } } @@ -2063,20 +1950,31 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param req Request to complete future for. + * @param cacheName Cache name. + * @param deploymentId Future deployment ID. */ - public void completeStartFuture(DynamicCacheChangeRequest req) { - DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId()); - - assert req.deploymentId() != null || req.globalStateChange() || req.resetLostPartitions(); - assert fut == null || fut.deploymentId != null || req.globalStateChange() || req.resetLostPartitions(); + void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) { + GridCacheProcessor.TemplateConfigurationFuture fut = + (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(cacheName); - if (fut != null && F.eq(fut.deploymentId(), req.deploymentId()) && - F.eq(req.initiatingNodeId(), ctx.localNodeId())) + if (fut != null && fut.deploymentId().equals(deploymentId)) fut.onDone(); } /** + * @param req Request to complete future for. + * @param err Error if any. + */ + void completeCacheStartFuture(DynamicCacheChangeRequest req, @Nullable Exception err) { + if (req.initiatingNodeId().equals(ctx.localNodeId())) { + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId()); + + if (fut != null) + fut.onDone(null, err); + } + } + + /** * Creates shared context. * * @param kernalCtx Kernal context. @@ -2132,322 +2030,40 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId())); + cachesInfo.collectJoiningNodeData(dataBag); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - dataBag.addNodeSpecificData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId())); + cachesInfo.collectGridNodeData(dataBag); } /** - * @param joiningNodeId Joining node id. + * @return {@code True} if need locally start all existing caches on client node start. */ - private Serializable getDiscoveryData(UUID joiningNodeId) { - boolean reconnect = ctx.localNodeId().equals(joiningNodeId) && cachesOnDisconnect != null; + private boolean startAllCachesOnClientStart() { + return START_CLIENT_CACHES && ctx.clientNode(); + } - // Collect dynamically started caches to a single object. - Collection reqs; + /** {@inheritDoc} */ + @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { + cachesInfo.onJoiningNodeDataReceived(data); + } - Map> clientNodesMap; + /** {@inheritDoc} */ + @Override public void onGridDataReceived(GridDiscoveryData data) { + cachesInfo.onGridDataReceived(data); + } - if (reconnect) { - reqs = new ArrayList<>(caches.size() + 1); - - clientNodesMap = U.newHashMap(caches.size()); - - collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId); - } - else { - reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1); - - clientNodesMap = ctx.discovery().clientNodesMap(); - - collectDataOnGridNode(reqs); - } - - DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs); - - batch.clientNodes(clientNodesMap); - - batch.clientReconnect(reconnect); - - if (ctx.localNodeId().equals(joiningNodeId)) - batch.startCaches(startAllCachesOnClientStart()); - - // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same. - batch.id(null); - - return batch; - } - - /** - * @param reqs requests. - */ - private void collectDataOnGridNode(Collection reqs) { - for (DynamicCacheDescriptor desc : cacheDescriptors()) { - // RequestId must be null because on different node will be different byte [] and - // we get duplicate discovery data, for more details see - // TcpDiscoveryNodeAddedMessage#addDiscoveryData. - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(), - null); - - req.startCacheConfiguration(desc.cacheConfiguration()); - req.cacheType(desc.cacheType()); - req.deploymentId(desc.deploymentId()); - req.receivedFrom(desc.receivedFrom()); - req.schema(desc.schema()); - - reqs.add(req); - } - - for (DynamicCacheDescriptor desc : registeredTemplates.values()) { - // RequestId must be null because on different node will be different byte [] and - // we get duplicate discovery data, for more details see - // TcpDiscoveryNodeAddedMessage#addDiscoveryData. - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(), - null); - - req.startCacheConfiguration(desc.cacheConfiguration()); - req.schema(desc.schema()); - - req.template(true); - - reqs.add(req); - } - } - - /** - * @param reqs requests. - * @param clientNodesMap Client nodes map. - * @param nodeId Node id. - */ - private void collectDataOnReconnectingNode( - Collection reqs, - Map> clientNodesMap, - UUID nodeId - ) { - for (GridCacheAdapter cache : caches.values()) { - DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name()); - - if (desc == null) - continue; - - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null); - - req.startCacheConfiguration(desc.cacheConfiguration()); - req.cacheType(desc.cacheType()); - req.deploymentId(desc.deploymentId()); - req.receivedFrom(desc.receivedFrom()); - req.schema(desc.schema()); - - reqs.add(req); - - Boolean nearEnabled = cache.isNear(); - - Map map = U.newHashMap(1); - - map.put(nodeId, nearEnabled); - - clientNodesMap.put(cache.name(), map); - } - } - - /** - * @return {@code True} if need locally start all existing caches on client node start. - */ - private boolean startAllCachesOnClientStart() { - return START_CLIENT_CACHES && ctx.clientNode(); - } - - /** {@inheritDoc} */ - @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { - if (data.hasJoiningNodeData()) { - Serializable joiningNodeData = data.joiningNodeData(); - if (joiningNodeData instanceof DynamicCacheChangeBatch) - onDiscoDataReceived( - data.joiningNodeId(), - data.joiningNodeId(), - (DynamicCacheChangeBatch) joiningNodeData, true); - } - } - - /** {@inheritDoc} */ - @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecData = data.nodeSpecificData(); - - if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) { - if (e.getValue() != null && e.getValue() instanceof DynamicCacheChangeBatch) { - DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch) e.getValue(); - - onDiscoDataReceived(data.joiningNodeId(), e.getKey(), batch, false); - } - } - } - } - - /** - * @param joiningNodeId Joining node id. - * @param rmtNodeId Rmt node id. - * @param batch Batch. - * @param join Whether this is data from joining node. - */ - private void onDiscoDataReceived(UUID joiningNodeId, UUID rmtNodeId, DynamicCacheChangeBatch batch, boolean join) { - if (batch.clientReconnect()) { - if (ctx.clientDisconnected()) { - if (clientReconnectReqs == null) - clientReconnectReqs = new LinkedHashMap<>(); - - clientReconnectReqs.put(joiningNodeId, batch); - - return; - } - - processClientReconnectData(joiningNodeId, batch); - } - else { - for (DynamicCacheChangeRequest req : batch.requests()) { - initReceivedCacheConfiguration(req); - - if (req.template()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); - - assert ccfg != null : req; - - DynamicCacheDescriptor existing = registeredTemplates.get(req.cacheName()); - - if (existing == null) { - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - ccfg, - req.cacheType(), - true, - req.deploymentId(), - req.schema()); - - registeredTemplates.put(req.cacheName(), desc); - } - - continue; - } - - DynamicCacheDescriptor existing = cacheDescriptor(req.cacheName()); - - if (req.start() && !req.clientStartOnly()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); - - if (existing != null) { - if (joiningNodeId.equals(ctx.localNodeId())) { - existing.receivedFrom(req.receivedFrom()); - existing.deploymentId(req.deploymentId()); - } - - if (existing.locallyConfigured()) { - existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration()); - - if (!join) - // Overwrite existing with remote. - existing.schema(req.schema()); - - ctx.discovery().setCacheFilter( - req.cacheName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode()); - } - } - else { - assert req.cacheType() != null : req; - - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - ccfg, - req.cacheType(), - false, - req.deploymentId(), - req.schema()); - - // Received statically configured cache. - if (req.initiatingNodeId() == null) - desc.staticallyConfigured(true); - - if (joiningNodeId.equals(ctx.localNodeId())) - desc.receivedOnDiscovery(true); - - desc.receivedFrom(req.receivedFrom()); - - DynamicCacheDescriptor old = cacheDescriptor(req.cacheName(), desc); - - assert old == null : old; - - ctx.discovery().setCacheFilter( - req.cacheName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode()); - } - } - } - - if (!F.isEmpty(batch.clientNodes())) { - for (Map.Entry> entry : batch.clientNodes().entrySet()) { - String cacheName = entry.getKey(); - - for (Map.Entry tup : entry.getValue().entrySet()) - ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); - } - } - - if (batch.startCaches()) { - for (Map.Entry entry : registeredCaches.entrySet()) - ctx.discovery().addClientNode(entry.getKey(), joiningNodeId, false); - } - } - } - - /** - * @param clientNodeId Client node ID. - * @param batch Cache change batch. - */ - private void processClientReconnectData(UUID clientNodeId, DynamicCacheChangeBatch batch) { - assert batch.clientReconnect() : batch; - - for (DynamicCacheChangeRequest req : batch.requests()) { - assert !req.template() : req; - - initReceivedCacheConfiguration(req); - - String name = req.cacheName(); - - boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name); - - if (!sysCache) { - DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); - - if (desc != null && desc.deploymentId().equals(req.deploymentId())) { - Map nodes = batch.clientNodes().get(name); - - assert nodes != null : req; - assert nodes.containsKey(clientNodeId) : nodes; - - ctx.discovery().addClientNode(req.cacheName(), clientNodeId, nodes.get(clientNodeId)); - } - } - else - ctx.discovery().addClientNode(req.cacheName(), clientNodeId, false); - } - } - - /** - * Dynamically starts cache using template configuration. - * - * @param cacheName Cache name. - * @return Future that will be completed when cache is deployed. - */ - public IgniteInternalFuture createFromTemplate(String cacheName) { - try { - CacheConfiguration cfg = createConfigFromTemplate(cacheName); + /** + * Dynamically starts cache using template configuration. + * + * @param cacheName Cache name. + * @return Future that will be completed when cache is deployed. + */ + public IgniteInternalFuture createFromTemplate(String cacheName) { + try { + CacheConfiguration cfg = createConfigFromTemplate(cacheName); return dynamicStartCache(cfg, cacheName, null, true, true, true); } @@ -2491,7 +2107,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { List wildcardNameCfgs = null; - for (DynamicCacheDescriptor desc : registeredTemplates.values()) { + for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) { assert desc.template(); CacheConfiguration cfg = desc.cacheConfiguration(); @@ -2703,12 +2319,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (checkThreadTx) checkEmptyTransactions(); - DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId()); - - t.stop(true); - t.destroy(true); + DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, true); - return F.first(initiateCacheChanges(F.asList(t), false)); + return F.first(initiateCacheChanges(F.asList(req), false)); } /** @@ -2723,11 +2336,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { List reqs = new ArrayList<>(cacheNames.size()); for (String cacheName : cacheNames) { - DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId()); + DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, true); - t.stop(true); - - reqs.add(t); + reqs.add(req); } GridCompoundFuture compoundFut = new GridCompoundFuture<>(); @@ -2744,7 +2355,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cacheName Cache name to close. * @return Future that will be completed when cache is closed. */ - public IgniteInternalFuture dynamicCloseCache(String cacheName) { + IgniteInternalFuture dynamicCloseCache(String cacheName) { assert cacheName != null; IgniteCacheProxy proxy = jCacheProxies.get(cacheName); @@ -2754,9 +2365,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { checkEmptyTransactions(); - DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId()); - - t.close(true); + DynamicCacheChangeRequest t = DynamicCacheChangeRequest.closeRequest(ctx, cacheName); return F.first(initiateCacheChanges(F.asList(t), false)); } @@ -2771,7 +2380,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { checkEmptyTransactions(); if (F.isEmpty(cacheNames)) - cacheNames = registeredCaches.keySet(); + cacheNames = cachesInfo.registeredCaches().keySet(); Collection reqs = new ArrayList<>(cacheNames.size()); @@ -2779,16 +2388,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = cacheDescriptor(cacheName); if (desc == null) { - log.warning("Reset lost partition will not be executed, " + - "because cache with name:" + cacheName + " doesn't not exist"); + U.warn(log, "Failed to find cache for reset lost partition request, cache does not exist: " + cacheName); continue; } - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( - UUID.randomUUID(), cacheName, ctx.localNodeId()); - - req.markResetLostPartitions(); + DynamicCacheChangeRequest req = DynamicCacheChangeRequest.resetLostPartitions(ctx, cacheName); reqs.add(req); } @@ -2841,14 +2446,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { List reqs = new ArrayList<>(); for (String cacheName : cacheNames()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( - UUID.randomUUID(), cacheName, ctx.localNodeId()); - - DynamicCacheDescriptor desc = cacheDescriptor(cacheName); - - req.deploymentId(desc.deploymentId()); - req.stop(true); - req.destroy(false); + DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false); reqs.add(req); } @@ -2876,11 +2474,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { String cacheName = cfg.getName(); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( - UUID.randomUUID(), cacheName, ctx.localNodeId()); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId()); req.startCacheConfiguration(cfg); - req.template(cfg.getName() != null && cfg.getName().endsWith("*")); + req.template(cfg.getName().endsWith("*")); req.nearCacheConfiguration(cfg.getNearConfiguration()); req.deploymentId(IgniteUuid.randomUuid()); req.schema(new QuerySchema(cfg.getQueryEntities())); @@ -2910,7 +2507,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { Collection sndReqs = new ArrayList<>(reqs.size()); for (DynamicCacheChangeRequest req : reqs) { - DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req); + DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req); try { if (req.stop() || req.close()) { @@ -2927,14 +2524,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.stop(true); } - - IgniteUuid dynamicDeploymentId = desc.deploymentId(); - - assert dynamicDeploymentId != null : desc; - - // Save deployment ID to avoid concurrent stops. - req.deploymentId(dynamicDeploymentId); - fut.deploymentId = dynamicDeploymentId; } } @@ -2944,7 +2533,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheStartFuture old = (DynamicCacheStartFuture)pendingFuts.putIfAbsent( req.requestId(), fut); - assert old == null; //TODO : check failIfExists. + assert old == null; if (fut.isDone()) continue; @@ -2993,12 +2582,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param topVer Topology version. */ public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { - if (type == EVT_NODE_JOINED) { - for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) { - if (node.id().equals(cacheDesc.receivedFrom())) - cacheDesc.receivedFromStartVersion(topVer); - } - } + cachesInfo.onDiscoveryEvent(type, node, topVer); sharedCtx.affinity().onDiscoveryEvent(type, node, topVer); } @@ -3025,198 +2609,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { return true; if (msg instanceof DynamicCacheChangeBatch) - return onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); + return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); return false; } /** - * @param batch Change request batch. - * @param topVer Current topology version. - * @return {@code True} if minor topology version should be increased. - */ - private boolean onCacheChangeRequested( - DynamicCacheChangeBatch batch, - AffinityTopologyVersion topVer - ) { - AffinityTopologyVersion newTopVer = null; - - boolean incMinorTopVer = false; - - for (DynamicCacheChangeRequest req : batch.requests()) { - initReceivedCacheConfiguration(req); - - if (req.template()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); - - assert ccfg != null : req; - - DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName()); - - if (desc == null) { - DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, - req.deploymentId(), req.schema()); - - DynamicCacheDescriptor old = registeredTemplates.put(ccfg.getName(), templateDesc); - - assert old == null : - "Dynamic cache map was concurrently modified [new=" + templateDesc + ", old=" + old + ']'; - } - - TemplateConfigurationFuture fut = - (TemplateConfigurationFuture)pendingTemplateFuts.get(ccfg.getName()); - - if (fut != null && fut.deploymentId().equals(req.deploymentId())) - fut.onDone(); - - continue; - } - - DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); - - DynamicCacheStartFuture fut = null; - - if (ctx.localNodeId().equals(req.initiatingNodeId())) { - fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId()); - - if (fut != null && !F.eq(req.deploymentId(), fut.deploymentId())) - fut = null; - } - - boolean needExchange = false; - - if (req.start()) { - if (desc == null) { - if (req.clientStartOnly()) { - if (fut != null) - fut.onDone(new IgniteCheckedException("Failed to start client cache " + - "(a cache with the given name is not started): " + U.maskName(req.cacheName()))); - } - else { - CacheConfiguration ccfg = req.startCacheConfiguration(); - - assert req.cacheType() != null : req; - assert F.eq(ccfg.getName(), req.cacheName()) : req; - - DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, - req.deploymentId(), req.schema()); - - if (newTopVer == null) { - newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(), - topVer.minorTopologyVersion() + 1); - } - - startDesc.startTopologyVersion(newTopVer); - - DynamicCacheDescriptor old = cacheDescriptor(ccfg.getName(), startDesc); - - assert old == null : - "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; - - ctx.discovery().setCacheFilter( - ccfg.getName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode()); - - ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); - - needExchange = true; - } - } - else { - assert req.initiatingNodeId() != null : req; - - // Cache already exists, exchange is needed only if client cache should be created. - ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); - - boolean clientReq = node != null && - !ctx.discovery().cacheAffinityNode(node, req.cacheName()); - - if (req.clientStartOnly()) { - needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); - } - else { - if (req.failIfExists()) { - if (fut != null) - fut.onDone(new CacheExistsException("Failed to start cache " + - "(a cache with the same name is already started): " + U.maskName(req.cacheName()))); - } - else { - needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); - - if (needExchange) - req.clientStartOnly(true); - } - } - - if (needExchange) { - if (newTopVer == null) { - newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(), - topVer.minorTopologyVersion() + 1); - } - - desc.clientCacheStartVersion(newTopVer); - } - } - - if (!needExchange && desc != null) { - if (desc.clientCacheStartVersion() != null) - req.cacheFutureTopologyVersion(desc.clientCacheStartVersion()); - else - req.cacheFutureTopologyVersion(desc.startTopologyVersion()); - } - } - else if (req.globalStateChange() || req.resetLostPartitions()) - needExchange = true; - else { - assert req.stop() ^ req.close() : req; - - if (desc != null) { - if (req.stop()) { - DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName()); - - assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']'; - - ctx.discovery().removeCacheFilter(req.cacheName()); - - needExchange = true; - } - else { - assert req.close() : req; - - needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); - } - } - } - - req.exchangeNeeded(needExchange); - - incMinorTopVer |= needExchange; - } - - return incMinorTopVer; - } - - /** - * @param req Cache change request. - */ - private void initReceivedCacheConfiguration(DynamicCacheChangeRequest req) { - if (req.startCacheConfiguration() != null) { - CacheConfiguration ccfg = req.startCacheConfiguration(); - - if (ccfg.isStoreKeepBinary() == null) - ccfg.setStoreKeepBinary(CacheConfiguration.DFLT_STORE_KEEP_BINARY); - } - } - - /** * Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode. * * @param cfgs Caches. @@ -3291,110 +2689,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Checks that remote caches has configuration compatible with the local. - * - * @param locCfg Local configuration. - * @param rmtCfg Remote configuration. - * @param rmtNode Remote node. - * @throws IgniteCheckedException If check failed. - */ - private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException { - ClusterNode locNode = ctx.discovery().localNode(); - - UUID rmt = rmtNode.id(); - - GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg); - GridCacheAttributes locAttr = new GridCacheAttributes(locCfg); - - boolean isLocAff = CU.affinityNode(locNode, locCfg.getNodeFilter()); - boolean isRmtAff = CU.affinityNode(rmtNode, rmtCfg.getNodeFilter()); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode", - locAttr.cacheMode(), rmtAttr.cacheMode(), true); - - if (rmtAttr.cacheMode() != LOCAL) { - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor", - locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode", - "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode", - "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true); - - boolean checkStore = isLocAff && isRmtAff; - - if (checkStore) - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory", - locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity", - locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper", - "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(), - rmtAttr.cacheAffinityMapperClassName(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount", - "Affinity partitions count", locAttr.affinityPartitionsCount(), - rmtAttr.affinityPartitionsCount(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter", - locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy", - locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup", - "Transaction manager lookup", locAttr.transactionManagerLookupClassName(), - rmtAttr.transactionManagerLookupClassName(), false); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout", - "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize", - "Preload batch size", locAttr.rebalanceBatchSize(), rmtAttr.rebalanceBatchSize(), false); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode", - "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(), - true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize", - "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(), - false); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled", - "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency", - "Write behind flush frequency", locAttr.writeBehindFlushFrequency(), - rmtAttr.writeBehindFlushFrequency(), false); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize", - "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(), - false); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount", - "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(), - rmtAttr.writeBehindFlushThreadCount(), false); - - if (locAttr.cacheMode() == PARTITIONED) { - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy", - "Near eviction policy", locAttr.nearEvictionPolicyClassName(), - rmtAttr.nearEvictionPolicyClassName(), false); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors", - "Affinity include neighbors", locAttr.affinityIncludeNeighbors(), - rmtAttr.affinityIncludeNeighbors(), true); - - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups", - "Affinity key backups", locAttr.affinityKeyBackups(), - rmtAttr.affinityKeyBackups(), true); - } - } - } - - /** * @param rmt Remote node to check. * @throws IgniteCheckedException If check failed. */ @@ -3635,27 +2929,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Descriptor. */ public DynamicCacheDescriptor cacheDescriptor(String name) { - return name != null ? registeredCaches.get(name) : null; - } - - /** - * Put registered cache descriptor. - * - * @param name Name. - * @param desc Descriptor. - * @return Old descriptor (if any). - */ - private DynamicCacheDescriptor cacheDescriptor(String name, DynamicCacheDescriptor desc) { - assert name != null; - - return registeredCaches.put(name, desc); + return cachesInfo.registeredCaches().get(name); } /** * @return Cache descriptors. */ public Collection cacheDescriptors() { - return registeredCaches.values(); + return cachesInfo.registeredCaches().values(); } /** @@ -3682,23 +2963,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException { assert cacheCfg.getName() != null; - String masked = cacheCfg.getName(); + String name = cacheCfg.getName(); - DynamicCacheDescriptor desc = registeredTemplates.get(masked); + DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(name); if (desc != null) return; - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheCfg.getName(), ctx.localNodeId()); - - CacheConfiguration cfg = new CacheConfiguration(cacheCfg); - - req.template(true); - - req.startCacheConfiguration(cfg); - req.schema(new QuerySchema(cfg.getQueryEntities())); - - req.deploymentId(IgniteUuid.randomUuid()); + DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg); TemplateConfigurationFuture fut = new TemplateConfigurationFuture(req.cacheName(), req.deploymentId()); @@ -3749,6 +3021,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param name Cache name. + * @return Cache proxy. + */ + @Nullable public IgniteCacheProxy jcacheProxy(String name) { + return jCacheProxies.get(name); + } + + /** * @return All configured public cache instances. */ public Collection> publicCaches() { @@ -3859,7 +3139,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException In case of error. */ public void createMissingQueryCaches() throws IgniteCheckedException { - for (Map.Entry e : registeredCaches.entrySet()) { + for (Map.Entry e : cachesInfo.registeredCaches().entrySet()) { DynamicCacheDescriptor desc = e.getValue(); if (isMissingQueryCache(desc)) @@ -4171,10 +3451,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") private class DynamicCacheStartFuture extends GridFutureAdapter { - /** Start ID. */ - @GridToStringInclude - private IgniteUuid deploymentId; - /** Cache name. */ private String cacheName; @@ -4184,23 +3460,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param cacheName Cache name. - * @param deploymentId Deployment ID. * @param req Cache start request. */ - private DynamicCacheStartFuture(String cacheName, IgniteUuid deploymentId, DynamicCacheChangeRequest req) { - this.deploymentId = deploymentId; + private DynamicCacheStartFuture(String cacheName, DynamicCacheChangeRequest req) { this.cacheName = cacheName; this.req = req; } /** - * @return Start ID. - */ - public IgniteUuid deploymentId() { - return deploymentId; - } - - /** * @return Request. */ public DynamicCacheChangeRequest request() { http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index f3c3a1b..1de64c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -281,7 +281,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) { + if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheStarted(cacheId)) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java index 94f11ed..f80adc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.NotNull; /** * Affinity assignment request. @@ -32,6 +31,9 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { /** */ private static final long serialVersionUID = 0L; + /** */ + private long futId; + /** Topology version being queried. */ private AffinityTopologyVersion topVer; @@ -43,14 +45,28 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { } /** + * @param futId Future ID. * @param cacheId Cache ID. * @param topVer Topology version. */ - public GridDhtAffinityAssignmentRequest(int cacheId, @NotNull AffinityTopologyVersion topVer) { + public GridDhtAffinityAssignmentRequest( + long futId, + int cacheId, + AffinityTopologyVersion topVer) { + assert topVer != null; + + this.futId = futId; this.cacheId = cacheId; this.topVer = topVer; } + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return false; @@ -75,7 +91,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 5; } /** {@inheritDoc} */ @@ -94,6 +110,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { switch (writer.state()) { case 3: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 4: if (!writer.writeMessage("topVer", topVer)) return false; @@ -116,6 +138,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { switch (reader.state()) { case 3: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: topVer = reader.readMessage("topVer"); if (!reader.isLastRead())