ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [46/46] ignite git commit: 1093
Date Fri, 02 Oct 2015 10:59:00 GMT
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5de124cd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5de124cd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5de124cd

Branch: refs/heads/ignite-1093-2
Commit: 5de124cd9a270446c301b6782a0530c48758f035
Parents: f0f7c32
Author: Anton Vinogradov <av@apache.org>
Authored: Thu Oct 1 18:43:46 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Thu Oct 1 18:43:46 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 24 +++++----
 .../dht/preloader/GridDhtPartitionSupplier.java | 56 ++++++++++++--------
 2 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5de124cd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index d1d475c..56a9c9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -199,8 +199,8 @@ public class GridDhtPartitionDemander {
     private boolean topologyChanged(SyncFuture fut) {
         return
             !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || //
Topology already changed.
-            fut != syncFut || // Same topology, but dummy exchange forced because of missing
partitions.
-            cctx.shared().exchange().hasPendingExchange(); // New topology pending.
+                fut != syncFut || // Same topology, but dummy exchange forced because of
missing partitions.
+                cctx.shared().exchange().hasPendingExchange(); // New topology pending.
     }
 
     /**
@@ -262,12 +262,6 @@ public class GridDhtPartitionDemander {
 
             final SyncFuture oldFut = syncFut;
 
-            if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual
topology.
-                U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion()
+ "]");
-
-                return;
-            }
-
             final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy(), ++updateSeq);
 
             if (!oldFut.isDummy())
@@ -281,6 +275,14 @@ public class GridDhtPartitionDemander {
 
             syncFut = fut;
 
+            if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual
topology.
+                U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion()
+ "]");
+
+                fut.cancel();
+
+                return;
+            }
+
             if (assigns.isEmpty()) {
                 fut.doneIfEmpty();
 
@@ -397,7 +399,9 @@ public class GridDhtPartitionDemander {
                     ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size()
+
                     ", topology=" + d.topologyVersion() + ", updateSeq=" + fut.updateSeq
+ "]");
 
-                fut.appendPartitions(node.id(), d.partitions());
+                Collection<Integer> parts = new HashSet<>(d.partitions());
+
+                fut.appendPartitions(node.id(), parts);
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
@@ -406,7 +410,7 @@ public class GridDhtPartitionDemander {
                 for (int cnt = 0; cnt < lsnrCnt; cnt++)
                     sParts.add(new HashSet<Integer>());
 
-                Iterator<Integer> it = d.partitions().iterator();
+                Iterator<Integer> it = parts.iterator();
 
                 int cnt = 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5de124cd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index b5bb25d..98946f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -46,6 +46,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
@@ -104,7 +105,7 @@ class GridDhtPartitionSupplier {
 
                             clearContext(sctx, log);
 
-                            U.log(log, "Supply context removed for failed node [node=" +
t.get1() + "]");
+                            U.log(log, "Supply context removed for node failed or left [node="
+ t.get1() + "]");
 
                             scMap.remove(t, sctx);
                         }
@@ -116,7 +117,7 @@ class GridDhtPartitionSupplier {
             }
         };
 
-        cctx.events().addListener(lsnr, EVT_NODE_FAILED);
+        cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         startOldListeners();
     }
@@ -301,12 +302,12 @@ class GridDhtPartitionSupplier {
                                     swapLsnr = null;
                                     loc = null;
 
-                                    reply(node, d, s);
+                                    reply(node, d, s, scId);
 
                                     return;
                                 }
                                 else {
-                                    if (!reply(node, d, s))
+                                    if (!reply(node, d, s, scId))
                                         return;
 
                                     s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
@@ -388,12 +389,12 @@ class GridDhtPartitionSupplier {
                                         swapLsnr = null;
                                         loc = null;
 
-                                        reply(node, d, s);
+                                        reply(node, d, s, scId);
 
                                         return;
                                     }
                                     else {
-                                        if (!reply(node, d, s))
+                                        if (!reply(node, d, s, scId))
                                             return;
 
                                         s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
@@ -510,12 +511,12 @@ class GridDhtPartitionSupplier {
 
                                     loc = null;
 
-                                    reply(node, d, s);
+                                    reply(node, d, s, scId);
 
                                     return;
                                 }
                                 else {
-                                    if (!reply(node, d, s))
+                                    if (!reply(node, d, s, scId))
                                         return;
 
                                     s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
@@ -553,7 +554,7 @@ class GridDhtPartitionSupplier {
 
             scMap.remove(scId);
 
-            reply(node, d, s);
+            reply(node, d, s, scId);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send partition supply message to node: " + id, e);
@@ -567,7 +568,10 @@ class GridDhtPartitionSupplier {
      * @return {@code True} if message was sent, {@code false} if recipient left grid.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2
s)
+    private boolean reply(ClusterNode n,
+        GridDhtPartitionDemandMessage d,
+        GridDhtPartitionSupplyMessageV2 s,
+        T2<UUID, Integer> scId)
         throws IgniteCheckedException {
 
         try {
@@ -575,8 +579,11 @@ class GridDhtPartitionSupplier {
                 log.debug("Replying to partition demand [node=" + n.id() + ", demand=" +
d + ", supply=" + s + ']');
 
             if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()) ||
// Topology already changed.
-                cctx.shared().exchange().hasPendingExchange()) // New topology pending.
-                return true;
+                cctx.shared().exchange().hasPendingExchange()) { // New topology pending.
+                clearContext(scMap.remove(scId), log);
+
+                return false;
+            }
 
             cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
 
@@ -590,6 +597,8 @@ class GridDhtPartitionSupplier {
             if (log.isDebugEnabled())
                 log.debug("Failed to send partition supply message because node left grid:
" + n.id());
 
+            clearContext(scMap.remove(scId), log);
+
             return false;
         }
     }
@@ -611,14 +620,15 @@ class GridDhtPartitionSupplier {
         GridDhtLocalPartition loc,
         AffinityTopologyVersion topVer,
         long updateSeq) {
-        SupplyContext old = scMap.putIfAbsent(t, new SupplyContext(phase,
-            partIt,
-            entryIt,
-            swapLsnr,
-            part,
-            loc,
-            topVer,
-            updateSeq));
+        SupplyContext old = scMap.putIfAbsent(t,
+            new SupplyContext(phase,
+                partIt,
+                entryIt,
+                swapLsnr,
+                part,
+                loc,
+                topVer,
+                updateSeq));
 
         assert old == null;
     }
@@ -643,13 +653,13 @@ class GridDhtPartitionSupplier {
         private final int part;
 
         /** Local partition. */
-        GridDhtLocalPartition loc;
+        private final GridDhtLocalPartition loc;
 
         /** Topology version. */
-        AffinityTopologyVersion topVer;
+        private final AffinityTopologyVersion topVer;
 
         /** Update seq. */
-        long updateSeq;
+        private final long updateSeq;
 
         /**
          * @param phase Phase.


Mime
View raw message