ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [01/14] ignite git commit: IGNITE-6667 wip.
Date Wed, 25 Oct 2017 14:01:22 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-6667 [created] 46501d92b


IGNITE-6667 wip.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee0c108a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee0c108a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee0c108a

Branch: refs/heads/ignite-6667
Commit: ee0c108a7287519516d1a7350223004927a81673
Parents: 3b18170
Author: ascherbakoff <alexey.scherbakoff@gmail.com>
Authored: Sat Oct 21 14:09:47 2017 +0300
Committer: ascherbakoff <alexey.scherbakoff@gmail.com>
Committed: Sat Oct 21 14:09:47 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 58 +++++++++----
 .../discovery/GridDiscoveryManager.java         | 87 ++++++++++----------
 2 files changed, 85 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee0c108a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4b57eb8..7206223 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -17,13 +17,6 @@
 
 package org.apache.ignite.internal.managers.discovery;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -36,6 +29,13 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 /**
  *
  */
@@ -108,7 +108,8 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> allCacheNodes,
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
-        Set<UUID> alives) {
+        Set<UUID> alives,
+        @Nullable IgniteProductVersion minVer) {
         this.topVer = topVer;
         this.state = state;
         this.loc = loc;
@@ -122,15 +123,15 @@ public class DiscoCache {
         this.nodeMap = nodeMap;
         this.alives.addAll(alives);
 
-        IgniteProductVersion minVer = null;
+        if (minVer == null) {
+            for (int i = 0; i < allNodes.size(); i++) {
+                ClusterNode node = allNodes.get(i);
 
-        for (int i = 0; i < allNodes.size(); i++) {
-            ClusterNode node = allNodes.get(i);
-
-            if (minVer == null)
-                minVer = node.version();
-            else if (node.version().compareTo(minVer) < 0)
-                minVer = node.version();
+                if (minVer == null)
+                    minVer = node.version();
+                else if (node.version().compareTo(minVer) < 0)
+                    minVer = node.version();
+            }
         }
 
         minNodeVer = minVer;
@@ -326,8 +327,31 @@ public class DiscoCache {
         return nodes == null ? Collections.<ClusterNode>emptyList() : nodes;
     }
 
+    /**
+     * Returns copy of discovery cache suitable for further reuse.
+     *
+     * @param ver Version.
+     * @return Copy.
+     */
+    public DiscoCache copy(AffinityTopologyVersion ver) {
+        return new DiscoCache(
+            ver,
+            state,
+            loc,
+            rmtNodes,
+            allNodes,
+            srvNodes,
+            daemonNodes,
+            rmtNodesWithCaches,
+            allCacheNodes,
+            cacheGrpAffNodes,
+            nodeMap,
+            alives,
+            minNodeVer);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DiscoCache.class, this);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee0c108a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a3b157d..06bb16c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -69,13 +69,16 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -232,7 +235,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
 
     /** Topology version. */
     private final AtomicReference<Snapshot> topSnap =
-        new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null));
+        new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null, false));
 
     /** Minor topology version. */
     private int minorTopVer;
@@ -353,7 +356,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
      */
     public void addCacheGroup(CacheGroupDescriptor grpDesc, IgnitePredicate<ClusterNode>
filter, CacheMode cacheMode) {
         CacheGroupAffinity old = registeredCacheGrps.put(grpDesc.groupId(),
-            new CacheGroupAffinity(grpDesc.cacheOrGroupName(), filter, cacheMode));
+                new CacheGroupAffinity(grpDesc.cacheOrGroupName(), filter, cacheMode));
 
         assert old == null : old;
     }
@@ -599,7 +602,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                 if (snapshots != null)
                     topHist = snapshots;
 
-                boolean verChanged;
+                boolean verChanged, incMinorTopVer = false, reusableForNextEvt = true, preventReuse
= false;
 
                 if (type == EVT_NODE_METRICS_UPDATED)
                     verChanged = false;
@@ -644,34 +647,40 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
 
                 final AffinityTopologyVersion nextTopVer;
 
+                final Snapshot snapshot = topSnap.get();
+
                 if (type == EVT_DISCOVERY_CUSTOM_EVT) {
                     assert customMsg != null;
 
-                    boolean incMinorTopVer;
-
                     if (customMsg instanceof ChangeGlobalStateMessage) {
                         incMinorTopVer = ctx.state().onStateChangeMessage(
                             new AffinityTopologyVersion(topVer, minorTopVer),
                             (ChangeGlobalStateMessage)customMsg,
                             discoCache());
+
+                        reusableForNextEvt = false;
                     }
                     else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
                         ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
 
-                        discoCache = createDiscoCache(topSnap.get().topVer,
+                        discoCache = createDiscoCache(snapshot.topVer,
                             ctx.state().clusterState(),
                             locNode,
                             topSnapshot);
 
-                        topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
-
-                        incMinorTopVer = false;
+                        topSnap.set(new Snapshot(snapshot.topVer, discoCache, true));
                     }
+                    else if (customMsg instanceof SnapshotDiscoveryMessage)
+                        incMinorTopVer = ((SnapshotDiscoveryMessage)customMsg).needExchange();
                     else {
                         incMinorTopVer = ctx.cache().onCustomEvent(
                             customMsg,
                             new AffinityTopologyVersion(topVer, minorTopVer),
                             node);
+
+                        reusableForNextEvt = customMsg instanceof CacheAffinityChangeMessage;
+
+                        preventReuse = customMsg instanceof DynamicCacheChangeBatch;
                     }
 
                     if (incMinorTopVer) {
@@ -709,19 +718,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                 // event notifications, since SPI notifies manager about all events from
this listener.
                 if (verChanged) {
                     if (discoCache == null) {
-                        discoCache = createDiscoCache(nextTopVer,
-                            ctx.state().clusterState(),
-                            locNode,
-                            topSnapshot);
+                        discoCache = incMinorTopVer && snapshot.reusable &&
!preventReuse ?
+                                snapshot.discoCache.copy(nextTopVer) :
+                                createDiscoCache(nextTopVer,
+                                        ctx.state().clusterState(),
+                                        locNode,
+                                        topSnapshot);
                     }
 
                     discoCacheHist.put(nextTopVer, discoCache);
 
-                    boolean set = updateTopologyVersionIfGreater(nextTopVer, discoCache);
-
-                    assert set || topVer == 0 : "Topology version has not been updated [this.topVer="
+
-                        topSnap + ", topVer=" + topVer + ", node=" + node +
+                    assert snapshot.topVer.compareTo(nextTopVer) < 0: "Topology version
out of order [this.topVer=" +
+                        topSnap + ", topVer=" + topVer + ", node=" + node + ", nextTopVer="
+ nextTopVer +
                         ", evt=" + U.gridEventName(type) + ']';
+
+                    topSnap.set(new Snapshot(nextTopVer, discoCache, reusableForNextEvt));
                 }
                 else
                     // Current version.
@@ -734,8 +745,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                     if (gridStartTime == 0)
                         gridStartTime = getSpi().getGridStartTime();
 
-                    updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()),
-                        discoCache);
+                    final AffinityTopologyVersion locTopVer = new AffinityTopologyVersion(locNode.order());
+
+                    if (topSnap.get().topVer.compareTo(locTopVer) < 0)
+                        topSnap.set(new Snapshot(locTopVer, discoCache, true));
 
                     startLatch.countDown();
 
@@ -784,7 +797,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                     topHist.clear();
 
                     topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                        createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(),
locNode, Collections.<ClusterNode>singleton(locNode))));
+                        createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(),
locNode,
+                            Collections.<ClusterNode>singleton(locNode)),
+                        true));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -1585,7 +1600,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
 
         if (!locJoin.isDone())
             locJoin.onDone(
-                new IgniteCheckedException("Failed to wait for local node joined event (grid
is stopping)."));
+                    new IgniteCheckedException("Failed to wait for local node joined event
(grid is stopping)."));
     }
 
     /** {@inheritDoc} */
@@ -2287,7 +2302,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
             Collections.unmodifiableMap(allCacheNodes),
             Collections.unmodifiableMap(cacheGrpAffNodes),
             Collections.unmodifiableMap(nodeMap),
-            alives);
+            alives,
+                null);
     }
 
     /**
@@ -2309,26 +2325,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
         cacheNodes.add(rich);
     }
 
-    /**
-     * Updates topology version if current version is smaller than updated.
-     *
-     * @param updated Updated topology version.
-     * @param discoCache Discovery cache.
-     * @return {@code True} if topology was updated.
-     */
-    private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion updated, DiscoCache
discoCache) {
-        while (true) {
-            Snapshot cur = topSnap.get();
-
-            if (updated.compareTo(cur.topVer) >= 0) {
-                if (topSnap.compareAndSet(cur, new Snapshot(updated, discoCache)))
-                    return true;
-            }
-            else
-                return false;
-        }
-    }
-
     /** Stops local node. */
     private void stopNode() {
         new Thread(
@@ -2893,13 +2889,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
         @GridToStringExclude
         private final DiscoCache discoCache;
 
+        /** */
+        private final boolean reusable;
+
         /**
          * @param topVer Topology version.
          * @param discoCache Disco cache.
+         * @param reusable Discovery cache can be reused between topology changes.
          */
-        private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) {
+        private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache, boolean reusable)
{
             this.topVer = topVer;
             this.discoCache = discoCache;
+            this.reusable = reusable;
         }
 
         /** {@inheritDoc} */


Mime
View raw message