ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 1093
Date Tue, 22 Sep 2015 16:08:23 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 94c9297ad -> a8b323d74


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8b323d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8b323d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8b323d7

Branch: refs/heads/ignite-1093-2
Commit: a8b323d74d57f33b6fa11a8ef629a1a26325e783
Parents: 94c9297
Author: Anton Vinogradov <av@apache.org>
Authored: Tue Sep 22 19:05:00 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Tue Sep 22 19:05:00 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 48 +++++++++++++-------
 .../GridCacheRebalancingSyncSelfTest.java       |  2 +-
 2 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a8b323d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 7f2dc48..bbb6a21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -256,6 +256,12 @@ public class GridDhtPartitionDemander {
 
             final SyncFuture oldFut = syncFut;
 
+            if (!oldFut.isDummy() && assigns.topologyVersion().compareTo(oldFut.topologyVersion())
< 0) {
+                U.log(log, "Skipping obsolete (dummy) exchange. [top=" + assigns.topologyVersion()
+ "]");
+
+                return;
+            }
+
             final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
 
             if (!oldFut.isDummy())
@@ -381,7 +387,7 @@ public class GridDhtPartitionDemander {
 
             U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode()
+
                 ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size()
+
-                ", topology=" + d.topologyVersion() + "]");
+                ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence()
+ "]");
 
             //Check remote node rebalancing API version.
             if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION)))
{
@@ -411,19 +417,24 @@ public class GridDhtPartitionDemander {
                         initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
 
                         try {
-                            if (!topologyChanged(fut))
+                            if (!topologyChanged(fut)) {
                                 cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt),
initD, cctx.ioPolicy(), d.timeout());
-                            else
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Requested rebalancing [from node=" + node.id()
+ ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt))
+ ")]");
+
+                            }
+                            else {
                                 fut.cancel();
+
+                                return;
+                            }
                         }
                         catch (IgniteCheckedException ex) {
                             fut.cancel();
 
                             U.error(log, "Failed to send partition demand message to node",
ex);
                         }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Requested rebalancing [from node=" + node.id() + ",
listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt))
+ ")]");
                     }
                 }
             }
@@ -619,12 +630,12 @@ public class GridDhtPartitionDemander {
             }
             else
                 fut.cancel();
-
         }
         catch (ClusterTopologyCheckedException e) {
             if (log.isDebugEnabled())
                 log.debug("Node left during rebalancing [node=" + node.id() +
                     ", msg=" + e.getMessage() + ']');
+
             fut.cancel();
         }
         catch (IgniteCheckedException ex) {
@@ -990,20 +1001,23 @@ public class GridDhtPartitionDemander {
                 if (log.isDebugEnabled())
                     log.debug("Completed sync future.");
 
-                Collection<Integer> m = new HashSet<>();
+                if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
 
-                for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet())
{
-                    if (e.getValue() != null && !e.getValue().isEmpty())
-                        m.addAll(e.getValue());
-                }
+                    Collection<Integer> m = new HashSet<>();
 
-                if (!m.isEmpty()) {
-                    U.log(log, ("Reassigning partitions that were missed: " + m));
+                    for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet())
{
+                        if (e.getValue() != null && !e.getValue().isEmpty())
+                            m.addAll(e.getValue());
+                    }
 
-                    cctx.shared().exchange().forceDummyExchange(true, exchFut);
-                }
+                    if (!m.isEmpty()) {
+                        U.log(log, ("Reassigning partitions that were missed: " + m));
 
-                cctx.shared().exchange().scheduleResendPartitions();
+                        cctx.shared().exchange().forceDummyExchange(true, exchFut);
+                    }
+
+                    cctx.shared().exchange().scheduleResendPartitions();
+                }
 
                 onDone();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8b323d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 712f3cd..42c1857 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -172,7 +172,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
                 log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE +
").");
 
             assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i
+ name.hashCode()) :
-                "value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i)
+ ")";
+                i + " value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i)
+ ")";
         }
     }
 


Mime
View raw message