ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject ignite git commit: IGNITE-7717 Fixed missed updateTopologyVersion call in case of a merged exchange - Fixes #3536.
Date Thu, 01 Mar 2018 12:42:27 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 0d714086d -> 623d3b6fa


IGNITE-7717 Fixed missed updateTopologyVersion call in case of a merged exchange - Fixes #3536.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/623d3b6f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/623d3b6f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/623d3b6f

Branch: refs/heads/master
Commit: 623d3b6fa971d6117c4cef92118b72418ec56974
Parents: 0d71408
Author: Pavel Kovalenko <jokserfn@gmail.com>
Authored: Thu Mar 1 15:41:52 2018 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Thu Mar 1 15:41:52 2018 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        | 10 ++++++--
 .../dht/GridDhtPartitionTopologyImpl.java       | 11 +++++++--
 .../GridDhtPartitionsExchangeFuture.java        | 24 ++++++++++++++++++++
 3 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/623d3b6f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index def00f3..5c98ee8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -206,9 +206,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         try {
             AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
 
-            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp="
+ grpId +
+            // Update is correct if topology version is newer or in case of newer discovery
caches.
+            boolean isCorrectUpdate = exchTopVer.compareTo(topVer) > 0
+                    || (exchTopVer.compareTo(topVer) == 0 && this.discoCache != null
&& discoCache.version().compareTo(this.discoCache.version()) > 0);
+
+            assert isCorrectUpdate : "Invalid topology version [grp=" + grpId +
                 ", topVer=" + topVer +
-                ", exchVer=" + exchTopVer + ']';
+                ", exchVer=" + exchTopVer +
+                ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version()
: "None") +
+                ", exchDiscoCacheVer=" + discoCache.version() + ']';
 
             this.stopping = stopping;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/623d3b6f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 020c3e7..1234042 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
@@ -235,7 +236,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
         GridDhtTopologyFuture exchFut,
-        DiscoCache discoCache,
+        @NotNull DiscoCache discoCache,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {
@@ -244,9 +245,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         try {
             AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
 
-            assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [grp="
+ grp.cacheOrGroupName() +
+            // Update is correct if topology version is newer or in case of newer discovery
caches.
+            boolean isCorrectUpdate = exchTopVer.compareTo(readyTopVer) > 0
+                    || (exchTopVer.compareTo(readyTopVer) == 0 && this.discoCache
!= null && discoCache.version().compareTo(this.discoCache.version()) > 0);
+
+            assert isCorrectUpdate : "Invalid topology version [grp=" + grp.cacheOrGroupName()
+
                 ", topVer=" + readyTopVer +
                 ", exchTopVer=" + exchTopVer +
+                ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version()
: "None") +
+                ", exchDiscoCacheVer=" + discoCache.version() +
                 ", fut=" + exchFut + ']';
 
             this.stopping = stopping;

http://git-wip-us.apache.org/repos/asf/ignite/blob/623d3b6f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d3e51e0..a56c4e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.processors.cache.ExchangeContext;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
 import org.apache.ignite.internal.processors.cache.StateChangeRequest;
@@ -806,6 +807,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Updates topology versions and discovery caches on all topologies.
+     *
      * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      */
@@ -1839,6 +1842,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Checks that some futures were merged to the current.
+     * Future without merges has only one DiscoveryEvent.
+     * If we merge futures to the current (see {@link GridCachePartitionExchangeManager#mergeExchanges(GridDhtPartitionsExchangeFuture,
GridDhtPartitionsFullMessage)})
+     * we add new discovery event from merged future.
+     *
+     * @return {@code True} If some futures were merged to current, false in other case.
+     */
+    private boolean hasMergedExchanges() {
+        return context().events().events().size() > 1;
+    }
+
+    /**
      * @param fut Current future.
      * @return Pending join request if any.
      */
@@ -2382,6 +2397,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this);
 
+                // Synchronize in case of changed coordinator (thread switched to sys-*)
+                synchronized (mux) {
+                    if (hasMergedExchanges())
+                        updateTopologies(true);
+                }
+
                 if (!finish)
                     return;
             }
@@ -2978,6 +2999,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         return; // Node is stopping, no need to further process exchange.
                     }
 
+                    if (hasMergedExchanges())
+                        updateTopologies(false);
+
                     assert resTopVer.equals(exchCtx.events().topologyVersion()) :  "Unexpected
result version [" +
                         "msgVer=" + resTopVer +
                         ", locVer=" + exchCtx.events().topologyVersion() + ']';


Mime
View raw message