Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0312E200CB9 for ; Sun, 2 Jul 2017 22:14:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0205D160BEE; Sun, 2 Jul 2017 20:14:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F280D160BDD for ; Sun, 2 Jul 2017 22:14:23 +0200 (CEST) Received: (qmail 60045 invoked by uid 500); 2 Jul 2017 20:14:23 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 60035 invoked by uid 99); 2 Jul 2017 20:14:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Jul 2017 20:14:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0F22AE0921; Sun, 2 Jul 2017 20:14:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <47aca0b9924945e39cf4dbb8f0f9ae5b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: gg-12389 Date: Sun, 2 Jul 2017 20:14:23 +0000 (UTC) archived-at: Sun, 02 Jul 2017 20:14:26 -0000 Repository: ignite Updated Branches: refs/heads/ignite-gg-12389 108530e67 -> 7c53948cd gg-12389 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7c53948c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7c53948c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7c53948c Branch: refs/heads/ignite-gg-12389 Commit: 7c53948cdb786563f4ee2c92aea8985dbf9eee50 Parents: 108530e Author: sboikov Authored: Fri Jun 30 22:29:13 2017 +0300 Committer: sboikov Committed: Sun Jul 2 23:11:35 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 16 ++ .../pagemem/store/IgnitePageStoreManager.java | 3 +- .../cache/CacheAffinitySharedManager.java | 4 +- .../processors/cache/ClusterCachesInfo.java | 172 ++++++++++++++----- .../cache/DynamicCacheChangeRequest.java | 1 - .../processors/cache/ExchangeActions.java | 4 +- .../GridCachePartitionExchangeManager.java | 48 +++--- .../processors/cache/GridCacheProcessor.java | 9 +- .../GridDhtPartitionsExchangeFuture.java | 5 +- .../GridCacheDatabaseSharedManager.java | 55 +----- .../IgniteCacheDatabaseSharedManager.java | 7 + .../persistence/file/FilePageStoreManager.java | 12 +- .../cluster/ChangeGlobalStateMessage.java | 14 ++ .../cluster/GridClusterStateProcessor.java | 120 +++++++++---- .../processors/query/GridQueryProcessor.java | 4 +- .../IgniteClusterActivateDeactivateTest.java | 59 +++++-- ...erActivateDeactivateTestWithPersistence.java | 43 +++++ .../pagemem/NoOpPageStoreManager.java | 7 +- 18 files changed, 394 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index b9b1d73..a899cd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -617,6 +617,10 @@ public class GridDiscoveryManager extends GridManagerAdapter { else if (customMsg instanceof ChangeGlobalStateFinishMessage) { ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg); + discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + + topSnap.set(new Snapshot(topSnap.get().topVer, discoCache)); + incMinorTopVer = false; } else { @@ -2374,6 +2378,9 @@ public class GridDiscoveryManager extends GridManagerAdapter { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { + /** */ + private DiscoCache discoCache; + /** Event queue. */ private final BlockingQueue, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); @@ -2492,6 +2499,9 @@ public class GridDiscoveryManager extends GridManagerAdapter { boolean segmented = false; + if (evt.get4() != null) + discoCache = evt.get4(); + switch (type) { case EVT_NODE_JOINED: { assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer + @@ -2616,6 +2626,12 @@ public class GridDiscoveryManager extends GridManagerAdapter { customEvt.affinityTopologyVersion(topVer); customEvt.customMessage(evt.get6()); + if (evt.get4() == null) { + assert discoCache != null; + + evt.set4(discoCache); + } + ctx.event().record(customEvt, evt.get4()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 468d35d..fa6e9e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -183,11 +183,10 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh public Map readCacheConfigurations() throws IgniteCheckedException; /** - * @param grpDesc Cache group descriptor. * @param cacheData Cache configuration. * @throws IgniteCheckedException If failed. */ - public void storeCacheData(CacheGroupDescriptor grpDesc, StoredCacheData cacheData) throws IgniteCheckedException; + public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException; /** * @param grpId Cache group ID. * @return {@code True} if index store for given cache group existed before node started. http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 438891a..a208641 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -737,11 +737,11 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap try { // Save configuration before cache started. - if (cctx.pageStore() != null && !cctx.localNode().isClient()) + if (cctx.pageStore() != null && !cctx.kernalContext().clientNode()) { cctx.pageStore().storeCacheData( - cacheDesc.groupDescriptor(), new StoredCacheData(req.startCacheConfiguration()) ); + } if (startCache) { cctx.cache().prepareCacheStart(req.startCacheConfiguration(), http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index f8aa6c6..cbbd2d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.query.QuerySchema; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; @@ -156,7 +157,9 @@ class ClusterCachesInfo { if (gridData != null && gridData.conflictErr != null) throw new IgniteCheckedException(gridData.conflictErr); - if (joinDiscoData != null && gridData != null) { + if (gridData != null && gridData.joinDiscoData != null) { + CacheJoinNodeDiscoveryData joinDiscoData = gridData.joinDiscoData; + for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) { CacheConfiguration locCfg = locCacheInfo.cacheData().config(); @@ -179,7 +182,6 @@ class ClusterCachesInfo { } } - joinDiscoData = null; gridData = null; } @@ -323,23 +325,42 @@ class ClusterCachesInfo { } } } - /** * @param batch Cache change request. * @param topVer Topology version. * @return {@code True} if minor topology version should be increased. */ boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) { - // TODO GG-12389 fail if no active (for others msgs too). + // TODO GG-12389 fail if inactive (for others msgs too). ExchangeActions exchangeActions = new ExchangeActions(); - boolean incMinorTopVer = false; + final CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, batch.requests(), topVer, false); - List addedDescs = new ArrayList<>(); + if (res.needExchange) { + assert !exchangeActions.empty() : exchangeActions; + + batch.exchangeActions(exchangeActions); + } + + return res.needExchange; + } + + /** + * @param exchangeActions Exchange actions to update. + * @param reqs Requests. + * @param topVer Topology version. + * @return {@code True} if minor topology version should be increased. + */ + CacheChangeProcessResult processCacheChangeRequests( + ExchangeActions exchangeActions, + Collection reqs, + AffinityTopologyVersion topVer, + boolean onClusterActivate) { + CacheChangeProcessResult res = new CacheChangeProcessResult(); final List> reqsToComplete = new ArrayList<>(); - for (DynamicCacheChangeRequest req : batch.requests()) { + for (DynamicCacheChangeRequest req : reqs) { if (req.template()) { CacheConfiguration ccfg = req.startCacheConfiguration(); @@ -363,10 +384,11 @@ class ClusterCachesInfo { assert old == null; - addedDescs.add(templateDesc); + res.addedDescs.add(templateDesc); } - ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId()); + if (!onClusterActivate) + ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId()); continue; } @@ -389,22 +411,32 @@ class ClusterCachesInfo { if (conflictErr != null) { U.warn(log, "Ignore cache start request. " + conflictErr); - ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + - "cache. " + conflictErr)); + IgniteCheckedException err = new IgniteCheckedException("Failed to start " + + "cache. " + conflictErr); + + if (onClusterActivate) + res.errs.add(err); + else + ctx.cache().completeCacheStartFuture(req, false, err); continue; } if (req.clientStartOnly()) { + assert !onClusterActivate; + ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + "client cache (a cache with the given name is not started): " + req.cacheName())); } else { SchemaOperationException err = QueryUtils.checkQueryEntityConflicts( - req.startCacheConfiguration(), ctx.cache().cacheDescriptors().values()); + req.startCacheConfiguration(), registeredCaches.values()); if (err != null) { - ctx.cache().completeCacheStartFuture(req, false, err); + if (onClusterActivate) + res.errs.add(err); + else + ctx.cache().completeCacheStartFuture(req, false, err); continue; } @@ -446,11 +478,15 @@ class ClusterCachesInfo { ccfg.getName(), ccfg.getNearConfiguration() != null); - ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); + if (req.initiatingNodeId() != null) { + ctx.discovery().addClientNode(req.cacheName(), + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); + } + else + assert onClusterActivate; - addedDescs.add(startDesc); + res.addedDescs.add(startDesc); exchangeActions.addCacheToStart(req, startDesc); @@ -458,6 +494,7 @@ class ClusterCachesInfo { } } else { + assert !onClusterActivate; assert req.initiatingNodeId() != null : req; if (req.failIfExists()) { @@ -577,14 +614,14 @@ class ClusterCachesInfo { reqsToComplete.add(new T2<>(req, waitTopVer)); } else - incMinorTopVer = true; + res.needExchange = true; } - if (!F.isEmpty(addedDescs)) { - AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer; + if (!F.isEmpty(res.addedDescs)) { + AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer; - for (DynamicCacheDescriptor desc : addedDescs) { - assert desc.template() || incMinorTopVer; + for (DynamicCacheDescriptor desc : res.addedDescs) { + assert desc.template() || res.needExchange; desc.startTopologyVersion(startTopVer); } @@ -616,13 +653,7 @@ class ClusterCachesInfo { }); } - if (incMinorTopVer) { - assert !exchangeActions.empty() : exchangeActions; - - batch.exchangeActions(exchangeActions); - } - - return incMinorTopVer; + return res; } /** @@ -951,7 +982,7 @@ class ClusterCachesInfo { } } - gridData = new GridData(cachesData, conflictErr); + gridData = new GridData(joinDiscoData, cachesData, conflictErr); if (!disconnectedState()) initStartCachesForLocalJoin(false); @@ -975,7 +1006,7 @@ class ClusterCachesInfo { locJoinStartCaches = new ArrayList<>(); locCfgsForActivation = new HashMap<>(); - boolean active = ctx.state().clusterState().active() && !ctx.state().clusterState().transition(); + boolean active = ctx.state().clusterState().active(); for (DynamicCacheDescriptor desc : registeredCaches.values()) { if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName())) @@ -1022,6 +1053,8 @@ class ClusterCachesInfo { locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg)); } } + + joinDiscoData = null; } } @@ -1037,25 +1070,24 @@ class ClusterCachesInfo { } /** - * @param exchangeActions Exchange actions to modify. + * @param msg Message. */ - void onStateChangeRequest(ExchangeActions exchangeActions) { - StateChangeRequest stateChangeReq = exchangeActions.stateChangeRequest(); - - assert stateChangeReq != null : exchangeActions; + ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer) { + ExchangeActions exchangeActions = new ExchangeActions(); - if (stateChangeReq.activate()) { + if (msg.activate()) { for (DynamicCacheDescriptor desc : registeredCaches.values()) { - desc.startTopologyVersion(stateChangeReq.topologyVersion()); + desc.startTopologyVersion(topVer); T2 locCfg = locCfgsForActivation.get(desc.cacheName()); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(stateChangeReq.requestId(), + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(), desc.cacheName(), locCfg != null ? ctx.localNodeId() : null); req.startCacheConfiguration(desc.cacheConfiguration()); + req.cacheType(desc.cacheType()); if (locCfg != null) { if (locCfg.get1() != null) @@ -1069,18 +1101,55 @@ class ClusterCachesInfo { for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values()) exchangeActions.addCacheGroupToStart(grpDesc); + + List storedCfgs = msg.storedCacheConfigurations(); + + if (storedCfgs != null) { + List reqs = new ArrayList<>(); + + for (StoredCacheData storedCfg : storedCfgs) { + CacheConfiguration ccfg = storedCfg.config(); + + if (!registeredCaches.containsKey(ccfg.getName())) { + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(), + ccfg.getName(), + null); + + req.startCacheConfiguration(ccfg); + req.cacheType(ctx.cache().cacheType(ccfg.getName())); + req.schema(new QuerySchema(storedCfg.queryEntities())); + + reqs.add(req); + } + } + + CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, reqs, topVer, true); + + if (!res.errs.isEmpty()) { + + } + } } else { + locCfgsForActivation = new HashMap<>(); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { - DynamicCacheChangeRequest req = - DynamicCacheChangeRequest.stopRequest(ctx, desc.cacheName(), desc.sql(), false); + DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, + desc.cacheName(), + desc.sql(), + false); exchangeActions.addCacheToStop(req, desc); + + if (ctx.discovery().cacheClientNode(ctx.discovery().localNode(), desc.cacheName())) + locCfgsForActivation.put(desc.cacheName(), new T2<>((CacheConfiguration)null, (NearCacheConfiguration)null)); } for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values()) exchangeActions.addCacheGroupToStop(grpDesc, false); } + + return exchangeActions; } /** @@ -1533,16 +1602,21 @@ class ClusterCachesInfo { */ private static class GridData { /** */ + private final CacheJoinNodeDiscoveryData joinDiscoData; + + /** */ private final CacheNodeCommonDiscoveryData gridData; /** */ private final String conflictErr; /** + * @param joinDiscoData Discovery data collected for local node join. * @param gridData Grid data. * @param conflictErr Cache configuration conflict error. */ - GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) { + GridData(CacheJoinNodeDiscoveryData joinDiscoData, CacheNodeCommonDiscoveryData gridData, String conflictErr) { + this.joinDiscoData = joinDiscoData; this.gridData = gridData; this.conflictErr = conflictErr; } @@ -1567,4 +1641,18 @@ class ClusterCachesInfo { this.caches = caches; } } + + /** + * + */ + private static class CacheChangeProcessResult { + /** */ + private boolean needExchange; + + /** */ + private final List addedDescs = new ArrayList<>(); + + /** */ + private final List errs = new ArrayList<>(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 80af649..f2fce18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -97,7 +97,6 @@ public class DynamicCacheChangeRequest implements Serializable { public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId) { assert reqId != null; assert cacheName != null; - assert initiatingNodeId != null; this.reqId = reqId; this.cacheName = cacheName; http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index 48f9cec..e9ece5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -89,7 +89,7 @@ public class ExchangeActions { /** * @return New caches start requests. */ - Collection cacheStartRequests() { + public Collection cacheStartRequests() { return cachesToStart != null ? cachesToStart.values() : Collections.emptyList(); } @@ -347,7 +347,7 @@ public class ExchangeActions { /** * */ - static class ActionData { + public static class ActionData { /** */ private final DynamicCacheChangeRequest req; http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 9788ab7..9198967 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -291,35 +291,37 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana return; } + if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT && + (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateFinishMessage)) { + ChangeGlobalStateFinishMessage stateFinishMsg = + (ChangeGlobalStateFinishMessage)((DiscoveryCustomEvent)evt).customMessage(); - // TODO GG-12389. - if (cache == null) - cache = cctx.discovery().discoCache(); + if (stateFinishMsg.clusterActive()) { + for (PendingDiscoveryEvent pendingEvt : pendingEvts) { + log.info("Process pending event: " + pendingEvt.event()); - if (cache.state().transition()) { - if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT && - (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateFinishMessage)) { - ChangeGlobalStateFinishMessage stateFinishMsg = - (ChangeGlobalStateFinishMessage)((DiscoveryCustomEvent)evt).customMessage(); - - if (stateFinishMsg.clusterActive()) { - for (PendingDiscoveryEvent pendingEvt : pendingEvts) - onDiscoveryEvent(pendingEvt.event(), pendingEvt.discoCache()); + onDiscoveryEvent(pendingEvt.event(), pendingEvt.discoCache()); } - - pendingEvts.clear(); } else { - if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) - pendingEvts.add(new PendingDiscoveryEvent(evt, cache)); - else { - U.warn(log, "Received custom discovery event while cluster state transition " + - "is in progress: " + evt); - } + for (PendingDiscoveryEvent pendingEvt : pendingEvts) + processEventInactive(pendingEvt.event(), pendingEvt.discoCache()); } + + pendingEvts.clear(); + + return; + } + + if (cache.state().transition()) { + log.info("Add pending event: " + evt); + + pendingEvts.add(new PendingDiscoveryEvent(evt, cache)); } else if (cache.state().active()) onDiscoveryEvent(evt, cache); + else + processEventInactive(evt, cache); if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) { final ClusterNode n = evt.eventNode(); @@ -339,6 +341,12 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } }; + private void processEventInactive(DiscoveryEvent evt, DiscoCache cache) { + log.info("Ignore event: " + evt); + + // TODO GG-12389: finish operations with error. + } + /** * @param evt Event. * @param cache Discovery data cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 9476c7b..52d607f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -108,6 +108,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.plugin.CachePluginManager; @@ -2294,10 +2295,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param exchangeActions Exchange actions to modify. + * @param msg Message. */ - public void onStateChangeRequest(ExchangeActions exchangeActions) { - cachesInfo.onStateChangeRequest(exchangeActions); + public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer) { + return cachesInfo.onStateChangeRequest(msg, topVer); } /** @@ -3177,7 +3178,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { proxy = new IgniteCacheProxy(cacheAdapter.context(), cacheAdapter, null, false); } - assert proxy != null; + assert proxy != null : name; return proxy.internalProxy(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 704bc76..6438718 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -571,7 +571,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert exchActions != null && !exchActions.empty(); exchange = onClusterStateChangeRequest(crdNode); - } else if (msg instanceof DynamicCacheChangeBatch) { assert exchActions != null && !exchActions.empty(); @@ -602,6 +601,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.activate(); cctx.cache().startCachesOnLocalJoin(topVer); + + cctx.database().readCheckpointAndRestoreMemory(); } } @@ -748,6 +749,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.affinity().onCacheChangeRequest(this, crd, exchActions); + cctx.database().readCheckpointAndRestoreMemory(); + if (log.isInfoEnabled()) { log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() + ", client=" + cctx.kernalContext().clientNode() + http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 498610f..ceb91ff 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -102,6 +102,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; +import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.StoredCacheData; @@ -437,47 +438,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // No-op. } - /** - * - */ - private void initCachesAndRestoreMemory() throws IgniteCheckedException { - Collection cacheNames = new HashSet<>(); - - // TODO IGNITE-5075 group descriptors. - for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) { - if (CU.isSystemCache(ccfg.getName())) { - storeMgr.initializeForCache( - cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), - new StoredCacheData(ccfg) - ); - - cacheNames.add(ccfg.getName()); - } - } - - for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) - if (!CU.isSystemCache(ccfg.getName())) { - DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptors().get(ccfg.getName()); - - if (cacheDesc != null) - storeMgr.initializeForCache( - cacheDesc.groupDescriptor(), - new StoredCacheData(ccfg) - ); - - cacheNames.add(ccfg.getName()); - } - - for (StoredCacheData cacheData : cctx.pageStore().readCacheConfigurations().values()) { - if (!cacheNames.contains(cacheData.config().getName())) - storeMgr.initializeForCache( - cctx.cache().cacheDescriptors().get( - cacheData.config().getName()).groupDescriptor(), cacheData); - } - - readCheckpointAndRestoreMemory(); - } - /** {@inheritDoc} */ @Override public void onActivate(GridKernalContext ctx) throws IgniteCheckedException { super.onActivate(ctx); @@ -492,12 +452,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan initDataBase(); registrateMetricsMBean(); - - initCachesAndRestoreMemory(); } - - if (log.isDebugEnabled()) - log.debug("Restore state after activation [nodeId=" + cctx.localNodeId() + " ]"); } /** {@inheritDoc} */ @@ -552,10 +507,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } - /** - * - */ - private void readCheckpointAndRestoreMemory() throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void readCheckpointAndRestoreMemory() throws IgniteCheckedException { checkpointReadLock(); try { @@ -1560,7 +1513,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Map, T2> partStates ) throws IgniteCheckedException { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) + if (grp.isLocal() || !grp.affinityNode()) // Local cache has no partitions and its states. continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index b1f304f..230fffe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -570,6 +570,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** + * @throws IgniteCheckedException If failed. + */ + public void readCheckpointAndRestoreMemory() throws IgniteCheckedException { + // No-op. + } + + /** * @param memPlcName Name of {@link MemoryPolicy} to obtain {@link MemoryMetrics} for. * @return {@link MemoryMetrics} snapshot for specified {@link MemoryPolicy} or {@code null} if * no {@link MemoryPolicy} is configured for specified name. http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 7bf1c36..28bf6e4 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -208,18 +208,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** {@inheritDoc} */ @Override public void storeCacheData( - CacheGroupDescriptor grpDesc, StoredCacheData cacheData ) throws IgniteCheckedException { - File cacheWorkDir = cacheWorkDirectory(grpDesc, cacheData.config()); + File cacheWorkDir = cacheWorkDirectory(cacheData.config()); File file; checkAndInitCacheWorkDir(cacheWorkDir); assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir; - if (grpDesc.sharedGroup()) + if (cacheData.config().getGroupName() != null) file = new File(cacheWorkDir, cacheData.config().getName() + CACHE_DATA_FILENAME); else file = new File(cacheWorkDir, CACHE_DATA_FILENAME); @@ -333,14 +332,13 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** - * @param grpDesc Cache group descriptor. * @param ccfg Cache configuration. * @return Cache work directory. */ - private File cacheWorkDirectory(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) { + private File cacheWorkDirectory(CacheConfiguration ccfg) { String dirName; - if (grpDesc.sharedGroup()) + if (ccfg.getGroupName() != null) dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName(); else dirName = CACHE_DIR_PREFIX + ccfg.getName(); @@ -357,7 +355,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException { assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName(); - File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg); + File cacheWorkDir = cacheWorkDirectory(ccfg); boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir); http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java index cb02faf..46b7927 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cluster; +import java.util.List; import java.util.UUID; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.ExchangeActions; +import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -44,6 +46,9 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { private boolean activate; /** */ + private List storedCfgs; + + /** */ private transient ExchangeActions exchangeActions; /** @@ -52,14 +57,23 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { public ChangeGlobalStateMessage( UUID requestId, UUID initiatingNodeId, + List storedCfgs, boolean activate ) { this.requestId = requestId; this.initiatingNodeId = initiatingNodeId; + this.storedCfgs = storedCfgs; this.activate = activate; } /** + * @return Stored cache configurations. + */ + @Nullable public List storedCacheConfigurations() { + return storedCfgs; + } + + /** * @return Cache updates to be executed on exchange. */ public ExchangeActions exchangeActions() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 3a2811a..6d6623b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -17,20 +17,24 @@ package org.apache.ignite.internal.processors.cluster; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterGroupAdapter; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -41,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse; import org.apache.ignite.internal.processors.cache.StateChangeRequest; +import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -50,6 +55,7 @@ import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.DiscoveryDataBag; @@ -166,10 +172,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { joinFut.onDone(msg.clusterActive()); if (msg.requestId().equals(globalState.transitionRequestId())) { + log.info("Received state change finish message: " + msg.clusterActive()); + globalState = DiscoveryDataClusterState.createState(msg.clusterActive()); ctx.cache().onStateChangeFinish(msg); } + else + U.warn(log, "Received state finish message with unexpected ID: " + msg); } /** @@ -222,18 +232,20 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { if (fut != null) fut.setRemaining(nodeIds, topVer.nextMinorVersion()); + log.info("Start state transition: " + msg.activate()); + globalState = DiscoveryDataClusterState.createTransitionState(msg.activate(), msg.requestId(), topVer, nodeIds); - ExchangeActions exchangeActions = new ExchangeActions(); + ExchangeActions exchangeActions = ctx.cache().onStateChangeRequest(msg, topVer); - StateChangeRequest req = new StateChangeRequest(msg, topVer.nextMinorVersion()); + AffinityTopologyVersion stateChangeTopVer = topVer.nextMinorVersion(); - exchangeActions.stateChangeRequest(req); + StateChangeRequest req = new StateChangeRequest(msg, stateChangeTopVer); - ctx.cache().onStateChangeRequest(exchangeActions); + exchangeActions.stateChangeRequest(req); msg.exchangeActions(exchangeActions); @@ -338,6 +350,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { * */ public IgniteInternalFuture changeGlobalState(final boolean activate) { + if (ctx.isDaemon() || ctx.clientNode()) { + GridFutureAdapter fut = new GridFutureAdapter<>(); + + sendCompute(activate, fut); + + return fut; + } + if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) { return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) + " cluster (must invoke the method outside of an active transaction).")); @@ -373,7 +393,28 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { return fut; } - ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId, ctx.localNodeId(), activate); + List storedCfgs = null; + + if (activate && sharedCtx.database().persistenceEnabled()) { + try { + Map cfgs = ctx.cache().context().pageStore().readCacheConfigurations(); + + if (!F.isEmpty(cfgs)) + storedCfgs = new ArrayList<>(cfgs.values()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to read stored cache configurations: " + e, e); + + startedFut.onDone(e); + + return startedFut; + } + } + + ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId, + ctx.localNodeId(), + storedCfgs, + activate); try { ctx.discovery().sendCustomEvent(msg); @@ -392,10 +433,34 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { } /** - * Invoke from exchange future. + * */ - public Exception changeGlobalState(StateChangeRequest req) { - return req.activate() ? onActivate(req.topologyVersion()) : onDeActivate(req.topologyVersion()); + private void sendCompute(boolean activate, final GridFutureAdapter res) { + AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); + + IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute(); + + if (log.isInfoEnabled()) { + log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() + + ", topVer=" + topVer + + ", client=" + ctx.clientNode() + + ", daemon" + ctx.isDaemon() + "]"); + } + + IgniteFuture fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate)); + + fut.listen(new CI1() { + @Override public void apply(IgniteFuture fut) { + try { + fut.get(); + + res.onDone(); + } + catch (Exception e) { + res.onDone(e); + } + } + }); } /** @@ -550,26 +615,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { * */ private void onFinalDeActivate(final StateChangeRequest req) { - final boolean client = ctx.clientNode(); - - if (log.isInfoEnabled()) - log.info("Successfully performed final deactivation steps [nodeId=" - + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]"); - - Exception ex = null; - - try { - sharedCtx.deactivate(); - - sharedCtx.affinity().removeAllCacheInfo(); - } - catch (Exception e) { - ex = e; - } - globalState.setTransitionResult(req.requestId(), false); - sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), ex); + sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), null); } /** @@ -713,7 +761,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { /** * @param event Event. */ - public void onDiscoveryEvent(DiscoveryEvent event) { + void onDiscoveryEvent(DiscoveryEvent event) { assert event != null; if (isDone()) @@ -731,7 +779,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { } /** - * + * @param nodesIds Node IDs. + * @param topVer Current topology version. */ void setRemaining(Set nodesIds, AffinityTopologyVersion topVer) { if (log.isDebugEnabled()) { @@ -749,6 +798,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { } /** + * @param nodeId Sender node ID. * @param msg Activation message response. */ public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { @@ -774,7 +824,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { * */ private void onAllReceived() { - Throwable e = new Throwable(); + IgniteCheckedException e = new IgniteCheckedException(); boolean fail = false; @@ -818,23 +868,23 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { /** */ private static final long serialVersionUID = 0L; - /** Activation. */ - private final boolean activation; + /** */ + private final boolean activate; /** Ignite. */ @IgniteInstanceResource private Ignite ignite; /** - * + * @param activate New cluster state. */ - private ClientChangeGlobalStateComputeRequest(boolean activation) { - this.activation = activation; + private ClientChangeGlobalStateComputeRequest(boolean activate) { + this.activate = activate; } /** {@inheritDoc} */ @Override public void run() { - ignite.active(activation); + ignite.active(activate); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index b9060ed..84b8d14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -512,10 +512,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { cacheData.queryEntities(cacheDesc.schema().entities()); - CacheGroupDescriptor grpDesc = ctx.cache().cacheDescriptors().get(cacheData.config().getName()).groupDescriptor(); - try { - ctx.cache().context().pageStore().storeCacheData(grpDesc, cacheData); + ctx.cache().context().pageStore().storeCacheData(cacheData); } catch (IgniteCheckedException e) { throw new IllegalStateException("Failed to persist cache data: " + cacheData.config().getName(), e); http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java index 23f9c69..be9ca9b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java @@ -56,7 +56,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ - private static final String CACHE_NAME_PREFIX = "cache-"; + protected static final String CACHE_NAME_PREFIX = "cache-"; /** */ private boolean client; @@ -125,7 +125,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest /** * @throws Exception If failed. */ - public void testActivateSimpleSingleNode() throws Exception { + public void testActivateSimple_SingleNode() throws Exception { activateSimple(1, 0, 0); } @@ -218,7 +218,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest * @param nodes Number of nodes. * @param caches Number of caches. */ - private void checkCaches(int nodes, int caches) { + final void checkCaches(int nodes, int caches) { for (int i = 0; i < nodes; i++) { for (int c = 0; c < caches; c++) { IgniteCache cache = ignite(i).cache(CACHE_NAME_PREFIX + c); @@ -406,7 +406,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest /** * @throws Exception If failed. */ - public void testDeactivateSimpleSingleNode() throws Exception { + public void testDeactivateSimple_SingleNode() throws Exception { deactivateSimple(1, 0, 0); } @@ -471,16 +471,45 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest for (int i = 0; i < srvs + clients; i++) assertFalse(ignite(i).active()); + + checkNoCaches(srvs + clients); + + client = false; + + startGrid(srvs + clients); + + checkNoCaches(srvs + clients + 1); + + client = true; + + startGrid(srvs + clients + 1); + + checkNoCaches(srvs + clients + 2); + + for (int i = 0; i < srvs + clients + 2; i++) + assertFalse(ignite(i).active()); + + ignite(deactivateFrom).active(true); + + for (int i = 0; i < srvs + clients + 2; i++) + assertTrue(ignite(i).active()); + + for (int i = 0; i < srvs; i++) { + for (int c = 0; c < 2; c++) + checkCache(ignite(i), CACHE_NAME_PREFIX + c, true); + } + + checkCaches(srvs + clients + 2, CACHES); } /** * @return Cache configurations. */ - private CacheConfiguration[] cacheConfigurations1() { + final CacheConfiguration[] cacheConfigurations1() { CacheConfiguration[] ccfgs = new CacheConfiguration[2]; - ccfgs[0] = cacheConfigurations(CACHE_NAME_PREFIX + 0, ATOMIC); - ccfgs[1] = cacheConfigurations(CACHE_NAME_PREFIX + 1, TRANSACTIONAL); + ccfgs[0] = cacheConfiguration(CACHE_NAME_PREFIX + 0, ATOMIC); + ccfgs[1] = cacheConfiguration(CACHE_NAME_PREFIX + 1, TRANSACTIONAL); return ccfgs; } @@ -488,13 +517,13 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest /** * @return Cache configurations. */ - private CacheConfiguration[] cacheConfigurations2() { + final CacheConfiguration[] cacheConfigurations2() { CacheConfiguration[] ccfgs = new CacheConfiguration[4]; - ccfgs[0] = cacheConfigurations(CACHE_NAME_PREFIX + 0, ATOMIC); - ccfgs[1] = cacheConfigurations(CACHE_NAME_PREFIX + 1, TRANSACTIONAL); - ccfgs[2] = cacheConfigurations(CACHE_NAME_PREFIX + 2, ATOMIC); - ccfgs[3] = cacheConfigurations(CACHE_NAME_PREFIX + 3, TRANSACTIONAL); + ccfgs[0] = cacheConfiguration(CACHE_NAME_PREFIX + 0, ATOMIC); + ccfgs[1] = cacheConfiguration(CACHE_NAME_PREFIX + 1, TRANSACTIONAL); + ccfgs[2] = cacheConfiguration(CACHE_NAME_PREFIX + 2, ATOMIC); + ccfgs[3] = cacheConfiguration(CACHE_NAME_PREFIX + 3, TRANSACTIONAL); return ccfgs; } @@ -504,7 +533,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest * @param atomicityMode Atomicity mode. * @return Cache configuration. */ - private CacheConfiguration cacheConfigurations(String name, CacheAtomicityMode atomicityMode) { + protected final CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) { CacheConfiguration ccfg = new CacheConfiguration(name); ccfg.setWriteSynchronizationMode(FULL_SYNC); @@ -519,7 +548,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest * @param node Node. * @param exp {@code True} if expect that cache is started on node. */ - private void checkCache(Ignite node, String cacheName, boolean exp) { + void checkCache(Ignite node, String cacheName, boolean exp) { GridCacheAdapter cache = ((IgniteKernal)node).context().cache().internalCache(cacheName); if (exp) @@ -531,7 +560,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest /** * @param nodes Number of nodes. */ - private void checkNoCaches(int nodes) { + final void checkNoCaches(int nodes) { for (int i = 0; i < nodes; i++) { GridCacheProcessor cache = ((IgniteKernal)ignite(i)).context().cache(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java index b26f113..422e31e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Arrays; +import org.apache.ignite.Ignite; import org.apache.ignite.testframework.GridTestUtils; /** @@ -41,4 +43,45 @@ public class IgniteClusterActivateDeactivateTestWithPersistence extends IgniteCl GridTestUtils.deleteDbFiles(); } + + /** + * @throws Exception If failed. + */ + public void testActivateCachesRestore_SingleNode() throws Exception { + activateCachesRestore(1); + } + + /** + * @throws Exception If failed. + */ + public void testActivateCachesRestore_5_Servers() throws Exception { + activateCachesRestore(5); + } + + /** + * @param srvs Number of server nodes. + * @throws Exception If failed. + */ + private void activateCachesRestore(int srvs) throws Exception { + Ignite srv = startGrids(srvs); + + srv.active(true); + + srv.createCaches(Arrays.asList(cacheConfigurations1())); + + stopAllGrids(); + + srv = startGrids(srvs); + + checkNoCaches(srvs); + + srv.active(true); + + for (int i = 0; i < srvs; i++) { + for (int c = 0; c < 2; c++) + checkCache(ignite(i), CACHE_NAME_PREFIX + c, true); + } + + checkCaches(srvs, 2); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java index ec6aaaa..665bb56 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java @@ -172,8 +172,7 @@ public class NoOpPageStoreManager implements IgnitePageStoreManager { } /** {@inheritDoc} */ - @Override public void storeCacheData(CacheGroupDescriptor grpDesc, - StoredCacheData cacheData) throws IgniteCheckedException { + @Override public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException { // No-op. } @@ -184,11 +183,11 @@ public class NoOpPageStoreManager implements IgnitePageStoreManager { /** {@inheritDoc} */ @Override public void onActivate(GridKernalContext kctx) { - + // No-op. } /** {@inheritDoc} */ @Override public void onDeActivate(GridKernalContext kctx) { - + // No-op. } }