ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [2/2] ignite git commit: IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way
Date Mon, 21 Nov 2016 20:27:04 GMT
IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d54d274
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d54d274
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d54d274

Branch: refs/heads/ignite-4242
Commit: 2d54d274aa804d8e445887fbe8875ed3bc0d4eab
Parents: 0086122
Author: Anton Vinogradov <av@apache.org>
Authored: Mon Nov 21 23:26:49 2016 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Mon Nov 21 23:26:49 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 12 +++--
 .../dht/preloader/GridDhtPartitionDemander.java | 51 ++++++++------------
 2 files changed, 27 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2d54d274/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index dd0256a..75aeeda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1569,12 +1569,14 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                             for (Integer cacheId : orderMap.get(order)) {
                                 GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
-                                Runnable cur = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+                                GridDhtPreloaderAssignments assignments = assignsMap.get(cacheId);
+
+                                Runnable cur = cacheCtx.preloader().addAssignments(assignments,
                                     forcePreload,
                                     cnt,
                                     r);
 
-                                if (cur != null)
+                                if (cur != null && !assignments.isEmpty())
                                     rebList.add(cacheCtx.name());
 
                                 rebFuts.add(cacheCtx.preloader().rebalanceFuture());
@@ -1588,9 +1590,11 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                 fut.get();
 
                         if (r != null) {
-                            Collections.reverse(rebList);
+                            if (!rebList.isEmpty()) {
+                                Collections.reverse(rebList);
 
-                            U.log(log, "Cache rebalancing scheduled: [order=" + rebList +
"]");
+                                U.log(log, "Cache rebalancing scheduled: [order=" + rebList
+ "]");
+                            }
 
                             if (futQ.isEmpty()) {
                                 U.log(log, "Rebalancing required " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d54d274/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 41b7bc1..fadec5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -287,25 +287,37 @@ public class GridDhtPartitionDemander {
                     }
                 });
 
+            if (assigns.isEmpty()) {
+                U.log(log, "Rebalancing is not required.");
+
+                ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+            }
+
             return new Runnable() {
                 @Override public void run() {
+                    if (assigns.isEmpty()) {
+                        rebalanceFut.onDone(true); // Starts next cache preloading (according
to order).
+
+                        return;
+                    }
+
                     try {
-                        requestPartitions(fut, assigns);
+                        requestPartitions(rebalanceFut, assigns);
                     }
-                    catch (ClusterTopologyCheckedException e){
+                    catch (ClusterTopologyCheckedException e) {
                         log.warning("Failed to send initial demand request to node.", e);
 
-                        fut.cancel();
+                        rebalanceFut.cancel();
                     }
                     catch (IgniteCheckedException e) {
                         log.error("Failed to send initial demand request to node.", e);
 
-                        fut.cancel();
+                        rebalanceFut.cancel();
                     }
                     catch (Throwable th) {
                         log.error("Runtime error caught during initial demand request sending.",
th);
 
-                        fut.cancel();
+                        rebalanceFut.cancel();
 
                         if (th instanceof Error)
                             throw th;
@@ -351,12 +363,6 @@ public class GridDhtPartitionDemander {
         RebalanceFuture fut,
         GridDhtPreloaderAssignments assigns
     ) throws IgniteCheckedException {
-        if (assigns.isEmpty()) {
-            fut.doneIfEmpty(assigns.cancelled());
-
-            return;
-        }
-
         if (topologyChanged(fut)) {
             fut.cancel();
 
@@ -382,7 +388,7 @@ public class GridDhtPartitionDemander {
 
             //Check remote node rebalancing API version.
             if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >=
0) {
-                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode()
+
+                U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
                     ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
                     ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq
+ "]");
 
@@ -841,24 +847,6 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @param cancelled Is cancelled.
-         */
-        private void doneIfEmpty(boolean cancelled) {
-            synchronized (this) {
-                if (isDone())
-                    return;
-
-                assert remaining.isEmpty();
-
-                if (log.isDebugEnabled())
-                    log.debug("Rebalancing is not required [cache=" + cctx.name() +
-                        ", topology=" + topVer + "]");
-
-                checkIsDone(cancelled, true);
-            }
-        }
-
-        /**
          * Cancels this future.
          *
          * @return {@code True}.
@@ -981,8 +969,7 @@ public class GridDhtPartitionDemander {
 
                 if (parts.isEmpty()) {
                     U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "")
+
-                        "rebalancing [cache=" + cctx.name() +
-                        ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+                        "rebalancing [fromNode=" + nodeId + ", topology=" + topologyVersion()
+
                         ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]"));
 
                     remaining.remove(nodeId);


Mime
View raw message