ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [24/40] ignite git commit: IGNITE-5830 - Introduce cache start and stop order during cluster activation
Date Thu, 27 Jul 2017 12:34:21 GMT
IGNITE-5830 - Introduce cache start and stop order during cluster activation


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7915fd88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7915fd88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7915fd88

Branch: refs/heads/ignite-5757
Commit: 7915fd88b1f3e399777bbc46f4e5625b68fb90c9
Parents: 85f1702
Author: Jokser <jokserfn@gmail.com>
Authored: Wed Jul 26 12:08:03 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Wed Jul 26 12:08:03 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/ClusterCachesInfo.java     | 135 ++++++++++++-------
 .../processors/cache/ExchangeActions.java       |   4 +-
 .../processors/cache/GridCacheProcessor.java    |  29 ++--
 3 files changed, 106 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 949bc19..1a05b96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -58,6 +59,7 @@ import org.apache.ignite.plugin.CachePluginContext;
 import org.apache.ignite.plugin.CachePluginProvider;
 import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -97,10 +99,10 @@ class ClusterCachesInfo {
     private GridData gridData;
 
     /** */
-    private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
+    private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches
= Collections.emptyList();
 
     /** */
-    private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation;
+    private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation
= Collections.emptyMap();
 
     /** */
     private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
@@ -111,7 +113,7 @@ class ClusterCachesInfo {
     /**
      * @param ctx Context.
      */
-    ClusterCachesInfo(GridKernalContext ctx) {
+    public ClusterCachesInfo(GridKernalContext ctx) {
         this.ctx = ctx;
 
         log = ctx.log(getClass());
@@ -121,7 +123,7 @@ class ClusterCachesInfo {
      * @param joinDiscoData Information about configured caches and templates.
      * @throws IgniteCheckedException If configuration validation failed.
      */
-    void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException
{
+    public void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException
{
         this.joinDiscoData = joinDiscoData;
 
         Map<String, CacheConfiguration> grpCfgs = new HashMap<>();
@@ -159,7 +161,7 @@ class ClusterCachesInfo {
      * @param checkConsistency {@code True} if need check cache configurations consistency.
      * @throws IgniteCheckedException If failed.
      */
-    void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
+    public void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
         if (gridData != null && gridData.conflictErr != null)
             throw new IgniteCheckedException(gridData.conflictErr);
 
@@ -330,7 +332,7 @@ class ClusterCachesInfo {
      * @param msg Message.
      * @param node Node sent message.
      */
-    void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node) {
+    public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node)
{
         Map<Integer, Boolean> startedCaches = msg.startedCaches();
 
         if (startedCaches != null) {
@@ -359,12 +361,13 @@ class ClusterCachesInfo {
             }
         }
     }
+
     /**
      * @param batch Cache change request.
      * @param topVer Topology version.
      * @return {@code True} if minor topology version should be increased.
      */
-    boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion
topVer) {
+    public boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion
topVer) {
         DiscoveryDataClusterState state = ctx.state().clusterState();
 
         if (state.active() && !state.transition()) {
@@ -779,30 +782,28 @@ class ClusterCachesInfo {
      *
      * @return Caches to be started when this node starts.
      */
-    List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin()
{
+    @NotNull public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>>
cachesToStartOnLocalJoin() {
         if (ctx.isDaemon())
             return Collections.emptyList();
 
-        assert locJoinStartCaches != null;
-
-        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches
= this.locJoinStartCaches;
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> result = locJoinStartCaches;
 
-        this.locJoinStartCaches = null;
+        locJoinStartCaches = Collections.emptyList();
 
-        return locJoinStartCaches;
+        return result;
     }
 
     /**
      * @param joinedNodeId Joined node ID.
      * @return New caches received from joined node.
      */
-    List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
+    @NotNull public List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId)
{
         assert joinedNodeId != null;
 
         List<DynamicCacheDescriptor> started = null;
 
         if (!ctx.isDaemon()) {
-            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
                 if (desc.staticallyConfigured()) {
                     assert desc.receivedFrom() != null : desc;
 
@@ -826,7 +827,7 @@ class ClusterCachesInfo {
      * @param node Event node.
      * @param topVer Topology version.
      */
-    void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+    public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer)
{
         if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
             for (CacheGroupDescriptor desc : registeredCacheGrps.values()) {
                 if (node.id().equals(desc.receivedFrom()))
@@ -856,7 +857,7 @@ class ClusterCachesInfo {
     /**
      * @param dataBag Discovery data bag.
      */
-    void collectGridNodeData(DiscoveryDataBag dataBag) {
+    public void collectGridNodeData(DiscoveryDataBag dataBag) {
         if (ctx.isDaemon())
             return;
 
@@ -931,7 +932,7 @@ class ClusterCachesInfo {
     /**
      * @param data Discovery data.
      */
-    void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+    public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
         if (ctx.isDaemon() || data.commonData() == null)
             return;
 
@@ -1045,6 +1046,9 @@ class ClusterCachesInfo {
     }
 
     /**
+     * Initialize collection with caches to be start:
+     * {@code locJoinStartCaches} or {@code locCfgsForActivation} if cluster is inactive.
+     *
      * @param firstNode {@code True} if first node in cluster starts.
      */
     private void initStartCachesForLocalJoin(boolean firstNode) {
@@ -1062,7 +1066,7 @@ class ClusterCachesInfo {
 
             boolean active = ctx.state().clusterState().active();
 
-            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
                 if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
                     continue;
 
@@ -1096,13 +1100,8 @@ class ClusterCachesInfo {
                 if (locCfg != null ||
                     joinDiscoData.startCaches() ||
                     CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter()))
{
-                    if (active) {
-                        // Move system and internal caches first.
-                        if (desc.cacheType().userCache())
-                            locJoinStartCaches.add(new T2<>(desc, nearCfg));
-                        else
-                            locJoinStartCaches.add(0, new T2<>(desc, nearCfg));
-                    }
+                    if (active)
+                        locJoinStartCaches.add(new T2<>(desc, nearCfg));
                     else
                         locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(),
nearCfg));
                 }
@@ -1113,7 +1112,7 @@ class ClusterCachesInfo {
     /**
      * @param msg Message.
      */
-    void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
         if (joinOnTransition) {
             initStartCachesForLocalJoin(false);
 
@@ -1127,17 +1126,14 @@ class ClusterCachesInfo {
      * @return Exchange action.
      * @throws IgniteCheckedException If configuration validation failed.
      */
-    ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion
topVer)
+    public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion
topVer)
         throws IgniteCheckedException {
         ExchangeActions exchangeActions = new ExchangeActions();
 
         if (msg.activate()) {
-            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
                 desc.startTopologyVersion(topVer);
 
-                T2<CacheConfiguration, NearCacheConfiguration> locCfg = !F.isEmpty(locCfgsForActivation)
?
-                    locCfgsForActivation.get(desc.cacheName()) : null;
-
                 DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
                     desc.cacheName(),
                     msg.initiatorNodeId());
@@ -1145,6 +1141,8 @@ class ClusterCachesInfo {
                 req.startCacheConfiguration(desc.cacheConfiguration());
                 req.cacheType(desc.cacheType());
 
+                T2<CacheConfiguration, NearCacheConfiguration> locCfg = locCfgsForActivation.get(desc.cacheName());
+
                 if (locCfg != null) {
                     if (locCfg.get1() != null)
                         req.startCacheConfiguration(locCfg.get1());
@@ -1199,7 +1197,7 @@ class ClusterCachesInfo {
         else {
             locCfgsForActivation = new HashMap<>();
 
-            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.REVERSE)) {
                 DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx,
                     desc.cacheName(),
                     desc.sql(),
@@ -1221,7 +1219,7 @@ class ClusterCachesInfo {
     /**
      * @param data Joining node data.
      */
-    void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+    public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data)
{
         if (data.hasJoiningNodeData()) {
             Serializable joiningNodeData = data.joiningNodeData();
 
@@ -1264,8 +1262,10 @@ class ClusterCachesInfo {
     }
 
     /**
+     * Checks cache configuration on conflict with already registered caches and cache groups.
+     *
      * @param cfg Cache configuration.
-     * @return {@code True} if validation passed.
+     * @return {@code null} if validation passed, error message in other case.
      */
     private String checkCacheConflict(CacheConfiguration<?, ?> cfg) {
         int cacheId = CU.cacheId(cfg.getName());
@@ -1480,17 +1480,10 @@ class ClusterCachesInfo {
     }
 
     /**
-     * @return Registered cache groups.
-     */
-    ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() {
-        return registeredCacheGrps;
-    }
-
-    /**
      * @param ccfg Cache configuration to start.
      * @throws IgniteCheckedException If failed.
      */
-    void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException
{
+    public void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException
{
         if (ccfg.getGroupName() != null) {
             CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName());
 
@@ -1563,9 +1556,29 @@ class ClusterCachesInfo {
     }
 
     /**
+     * @return Registered cache groups.
+     */
+    ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() {
+        return registeredCacheGrps;
+    }
+
+    /**
+     * Returns registered cache descriptors ordered by {@code comparator}
+     * @param comparator Comparator (DIRECT, REVERSE or custom) to order cache descriptors.
+     * @return Ordered by comparator cache descriptors.
+     */
+    private Collection<DynamicCacheDescriptor> orderedCaches(Comparator<DynamicCacheDescriptor>
comparator) {
+        List<DynamicCacheDescriptor> ordered = new ArrayList<>();
+        ordered.addAll(registeredCaches.values());
+
+        Collections.sort(ordered, comparator);
+        return ordered;
+    }
+
+    /**
      *
      */
-    void onDisconnect() {
+    public void onDisconnected() {
         cachesOnDisconnect = new CachesOnDisconnect(
             ctx.state().clusterState(),
             new HashMap<>(registeredCacheGrps),
@@ -1583,7 +1596,7 @@ class ClusterCachesInfo {
      * @param transition {@code True} if reconnected while state transition in progress.
      * @return Information about stopped caches and cache groups.
      */
-    ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) {
+    public ClusterCachesReconnectResult onReconnected(boolean active, boolean transition)
{
         assert disconnectedState();
 
         Set<String> stoppedCaches = new HashSet<>();
@@ -1685,6 +1698,38 @@ class ClusterCachesInfo {
     }
 
     /**
+     * Holds direct comparator (first system caches) and reverse comparator (first user caches).
+     * Use DIRECT comparator for ordering cache start operations.
+     * Use REVERSE comparator for ordering cache stop operations.
+     */
+    private static class CacheComparators {
+        /**
+         * DIRECT comparator for cache descriptors (first system caches).
+         */
+        static Comparator<DynamicCacheDescriptor> DIRECT = new Comparator<DynamicCacheDescriptor>()
{
+            @Override
+            public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) {
+                if (!o1.cacheType().userCache())
+                    return -1;
+                if (!o2.cacheType().userCache())
+                    return 1;
+
+                return o1.cacheId().compareTo(o2.cacheId());
+            }
+        };
+
+        /**
+         * REVERSE comparator for cache descriptors (first user caches).
+         */
+        static Comparator<DynamicCacheDescriptor> REVERSE = new Comparator<DynamicCacheDescriptor>()
{
+            @Override
+            public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) {
+                return -DIRECT.compare(o1, o2);
+            }
+        };
+    }
+
+    /**
      *
      */
     private static class GridData {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 1cc6438..91ad003 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -224,7 +224,7 @@ public class ExchangeActions {
         assert desc != null;
 
         if (map == null)
-            map = new HashMap<>();
+            map = new LinkedHashMap<>();
 
         CacheActionData old = map.put(req.cacheName(), new CacheActionData(req, desc));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5b709b3..9902a92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -146,6 +146,7 @@ import org.apache.ignite.spi.IgniteNodeValidationResult;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
@@ -1046,7 +1047,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         sharedCtx.onDisconnected(reconnectFut);
 
-        cachesInfo.onDisconnect();
+        cachesInfo.onDisconnected();
     }
 
     /**
@@ -1733,7 +1734,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @return Caches to be started when this node starts.
      */
-    public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin()
{
+    @NotNull public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>>
cachesToStartOnLocalJoin() {
         return cachesInfo.cachesToStartOnLocalJoin();
     }
 
@@ -1771,22 +1772,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         throws IgniteCheckedException {
         List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId);
 
-        if (started != null) {
-            for (DynamicCacheDescriptor desc : started) {
-                IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter();
-
-                if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
-                    prepareCacheStart(
-                        desc.cacheConfiguration(),
-                        desc,
-                        null,
-                        exchTopVer
-                    );
-                }
+        for (DynamicCacheDescriptor desc : started) {
+            IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter();
+
+            if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
+                prepareCacheStart(
+                    desc.cacheConfiguration(),
+                    desc,
+                    null,
+                    exchTopVer
+                );
             }
         }
 
-        return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
+        return started;
     }
 
     /**


Mime
View raw message