ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dma...@apache.org
Subject ignite git commit: slow rebalancing
Date Wed, 30 Sep 2015 09:31:24 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-slow-rebal 1597e6ca1 -> a950de966


slow rebalancing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a950de96
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a950de96
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a950de96

Branch: refs/heads/ignite-slow-rebal
Commit: a950de96624dcef64b6e8935c4f82ef7629035b3
Parents: 1597e6c
Author: Denis Magda <dmagda@gridgain.com>
Authored: Wed Sep 30 12:30:47 2015 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Wed Sep 30 12:30:47 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 48 ++++++++++++++++++--
 .../preloader/GridDhtPartitionsFullMessage.java |  4 +-
 .../GridDhtPartitionsSingleMessage.java         |  4 +-
 3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index eb76233..630d57a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -626,6 +627,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * Schedules next full partitions update.
      */
     public void scheduleResendPartitions() {
+        log.info("scheduleResendPartitoins");
+
         ResendTimeoutObject timeout = pendingResend.get();
 
         if (timeout == null || timeout.started()) {
@@ -668,6 +671,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 log.debug("Refreshing local partitions from non-oldest node: " +
                     cctx.localNodeId());
 
+            log.info("Refreshing local partitions from non-oldest node: [locNode= " +
+                cctx.localNodeId() + ']');
+
             sendLocalPartitions(oldest, null);
         }
     }
@@ -701,9 +707,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
         GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
 
+        List<String> caches = new ArrayList<>();
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal() && cacheCtx.started())
+            if (!cacheCtx.isLocal() && cacheCtx.started()) {
                 m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+                caches.add(cacheCtx.name());
+            }
         }
 
         // It is important that client topologies be added after contexts.
@@ -713,9 +723,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" +
m + ']');
 
+        log.info("Before sending all partitions: [rmtNodes=" + nodes + ']');
+
         for (ClusterNode node : nodes) {
             try {
+                long time = System.currentTimeMillis();
+
+                log.info("Start sending all partitions [caches=" + caches + ", time=" + new
Date(time) + ", node=" + node + ']');
+
                 cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
+
+                long passed = System.currentTimeMillis();
+
+                log.info("Stop sending all partitions [caches=" + caches + ",time=" + new
Date(passed) + ", diff=" + (passed - time) +
+                    ", node=" + node + ']');
             }
             catch (ClusterTopologyCheckedException ignore) {
                 if (log.isDebugEnabled())
@@ -904,6 +925,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (log.isDebugEnabled())
                     log.debug("Received full partition update [node=" + node.id() + ", msg="
+ msg + ']');
 
+                log.info("Received full partition update [node=" + node.id() + ", msg=" +
msg + ", time=" + new Date() + ']');
+
                 boolean updated = false;
 
                 for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet())
{
@@ -925,8 +948,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         updated |= top.update(null, entry.getValue()) != null;
                 }
 
-                if (!cctx.kernalContext().clientNode() && updated)
+                if (!cctx.kernalContext().clientNode() && updated) {
+                    log.info("refreshPartitions: processFullPartitionUpdate");
                     refreshPartitions();
+                }
             }
             else
                 exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
@@ -950,6 +975,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     log.debug("Received local partition update [nodeId=" + node.id() + ",
parts=" +
                         msg + ']');
 
+                log.info("Received local partition update [nodeId=" + node.id() + ", parts="
+
+                    msg + ']');
+
                 boolean updated = false;
 
                 for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet())
{
@@ -968,8 +996,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         updated |= top.update(null, entry.getValue()) != null;
                 }
 
-                if (updated)
+                if (updated) {
+                    log.info("Partitions updated, schedule: [sender=" + node + ']');
                     scheduleResendPartitions();
+                }
             }
             else {
                 if (msg.client()) {
@@ -1149,6 +1179,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     // If not first preloading and no more topology events present,
                     // then we periodically refresh partition map.
                     if (!cctx.kernalContext().clientNode() && futQ.isEmpty() &&
preloadFinished) {
+                        log.info("refreshPartitions: preloadFinished");
+
                         refreshPartitions(timeout);
 
                         timeout = cctx.gridConfig().getNetworkTimeout();
@@ -1214,8 +1246,11 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
 
                             startEvtFired = true;
 
-                            if (!cctx.kernalContext().clientNode() && changed &&
futQ.isEmpty())
+                            if (!cctx.kernalContext().clientNode() && changed &&
futQ.isEmpty()) {
+                                log.info("refreshPartitions: ExcahngeWorker body");
+
                                 refreshPartitions();
+                            }
                         }
                         else {
                             if (log.isDebugEnabled())
@@ -1311,8 +1346,11 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                         return;
 
                     try {
-                        if (started.compareAndSet(false, true))
+                        if (started.compareAndSet(false, true)) {
+                            log.info("refreshPartitions: ResendTimeoutObject");
+
                             refreshPartitions();
+                        }
                     }
                     finally {
                         busyLock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index b91a2de..50e2e41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -95,7 +95,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException
{
         super.prepareMarshal(ctx);
 
-        if (parts != null)
+        if (partsBytes == null && parts != null)
             partsBytes = ctx.marshaller().marshal(parts);
     }
 
@@ -200,4 +200,4 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         return S.toString(GridDhtPartitionsFullMessage.class, this, "partCnt", parts != null
? parts.size() : 0,
             "super", super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 9b6dcf7..85d8d0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -98,7 +98,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException
{
         super.prepareMarshal(ctx);
 
-        if (parts != null)
+        if (partsBytes == null && parts != null)
             partsBytes = ctx.marshaller().marshal(parts);
     }
 
@@ -188,4 +188,4 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @Override public String toString() {
         return S.toString(GridDhtPartitionsSingleMessage.class, this, super.toString());
     }
-}
\ No newline at end of file
+}


Mime
View raw message