ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 1093
Date Fri, 16 Oct 2015 16:30:25 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 6629fea44 -> 57064970f


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57064970
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57064970
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57064970

Branch: refs/heads/ignite-1093-2
Commit: 57064970fe04ccdeefed9800250bc8f0c2cb831f
Parents: 6629fea
Author: Anton Vinogradov <avinogradov@gridgain.com>
Authored: Fri Oct 16 09:23:50 2015 +0300
Committer: Anton Vinogradov <avinogradov@gridgain.com>
Committed: Fri Oct 16 09:23:50 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 11 +++---
 .../dht/preloader/GridDhtPartitionSupplier.java | 38 ++++++++++++--------
 2 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/57064970/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 3caff69..b8aa2b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -398,9 +398,8 @@ public class GridDhtPartitionDemander {
                             cctx.io().sendOrderedMessage(node,
                                 GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD,
cctx.ioPolicy(), d.timeout());
 
-//                            if (log.isDebugEnabled())
-//                                log.debug(
-                                    U.log(log,"Requested rebalancing [from node=" + node.id()
+ ", listener index=" +
+                            if (log.isDebugEnabled())
+                                log.debug("Requested rebalancing [from node=" + node.id()
+ ", listener index=" +
                                     cnt + ", partitions count=" + sParts.get(cnt).size()
+
                                     " (" + partitionsList(sParts.get(cnt)) + ")]");
                         }
@@ -500,9 +499,8 @@ public class GridDhtPartitionDemander {
             return;
         }
 
-//        if (log.isDebugEnabled())
-//            log.debug(
-                U.log(log,"Received supply message: " + supply);
+        if (log.isDebugEnabled())
+            log.debug("Received supply message: " + supply);
 
         // Check whether there were class loading errors on unmarshal
         if (supply.classError() != null) {
@@ -982,7 +980,6 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         *
          * @param cancelled Is cancelled.
          */
         private void checkIsDone(boolean cancelled) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/57064970/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index d68ce8b..00d1b9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -187,9 +188,22 @@ class GridDhtPartitionSupplier {
         assert d != null;
         assert id != null;
 
-        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) {
+        AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion demTop = d.topologyVersion();
+
+        if (!cutTop.equals(demTop)) {
+            if (cutTop.compareTo(demTop) < 0)
+                // Resend demand message.
+                try {
+                    cctx.io().sendOrderedMessage(cctx.localNode(), GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                        d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to resend partition supply message to local node:
" + cctx.localNode().id());
+                }
+            else if (log.isDebugEnabled())
+                log.debug("Demand request cancelled [current=" + cctx.affinity().affinityTopologyVersion()
+ ", demanded=" + d.topologyVersion() + "]");
 
-            U.log(log, "Demand request cancelled [current=" + cctx.affinity().affinityTopologyVersion()
+ ", demanded=" + d.topologyVersion() + "]");
             return;
         }
 
@@ -228,9 +242,8 @@ class GridDhtPartitionSupplier {
                 maxBatchesCnt = 1;
             }
             else {
-//                if (log.isDebugEnabled())
-//                    log.debug
-                        U.log(log, "Starting supplying rebalancing [cache=" + cctx.name()
+
+                if (log.isDebugEnabled())
+                    log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
                         ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size()
+
                         ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence()
+
                         ", idx=" + idx + "]");
@@ -571,9 +584,8 @@ class GridDhtPartitionSupplier {
 
             reply(node, d, s, scId);
 
-//            if (log.isDebugEnabled())
-//                log.debug(
-                    U.log(log, "Finished supplying rebalancing [cache=" + cctx.name() +
+            if (log.isDebugEnabled())
+                log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
                     ", fromNode=" + node.id() +
                     ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence()
+
                     ", idx=" + idx + "]");
@@ -597,9 +609,8 @@ class GridDhtPartitionSupplier {
         throws IgniteCheckedException {
 
         try {
-//            if (log.isDebugEnabled())
-//                log.debug
-                   U.log(log, "Replying to partition demand [node=" + n.id() + ", demand="
+ d + ", supply=" + s + ']');
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" +
d + ", supply=" + s + ']');
 
             cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
 
@@ -610,9 +621,8 @@ class GridDhtPartitionSupplier {
             return true;
         }
         catch (ClusterTopologyCheckedException ignore) {
-//            if (log.isDebugEnabled())
-//                log.debug
-                    U.log(log, "Failed to send partition supply message because node left
grid: " + n.id());
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid:
" + n.id());
 
             clearContext(scMap.remove(scId), log);
 


Mime
View raw message