ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject ignite git commit: IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way
Date Thu, 24 Nov 2016 13:12:17 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4242 cfe5ebf60 -> 38df717f8


IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/38df717f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/38df717f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/38df717f

Branch: refs/heads/ignite-4242
Commit: 38df717f8ccb70538c871c92cda0528beb2eca65
Parents: cfe5ebf
Author: Anton Vinogradov <av@apache.org>
Authored: Thu Nov 24 16:11:59 2016 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Thu Nov 24 16:11:59 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 46 ++++++-----
 .../dht/preloader/GridDhtPartitionDemander.java | 83 ++++++++++++--------
 2 files changed, 73 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/38df717f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c123774..15ea3f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -30,7 +30,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -1387,8 +1386,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             int cnt = 0;
 
-            Set<IgniteInternalFuture<Boolean>> rebFuts = null;
-
             while (!isCancelled()) {
                 GridDhtPartitionsExchangeFuture exchFut = null;
 
@@ -1557,59 +1554,60 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                             orderMap.get(order).add(cacheId);
                         }
 
-                        rebFuts = new HashSet<>();
-
                         Runnable r = null;
 
                         List<String> rebList = new LinkedList<>();
 
+                        boolean assignsCancelled = false;
+
                         for (Integer order : orderMap.descendingKeySet()) {
                             for (Integer cacheId : orderMap.get(order)) {
                                 GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
                                 GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
 
+                                if (assigns != null)
+                                    assignsCancelled |= assigns.cancelled();
+
                                 Runnable cur = cacheCtx.preloader().addAssignments(assigns,
                                     forcePreload,
                                     cnt,
                                     r);
 
-                                if (assigns!= null && !assigns.isEmpty())
-                                    rebList.add(cacheCtx.name());
-
-                                rebFuts.add(cacheCtx.preloader().rebalanceFuture());
+                                if (cur != null) {
+                                    rebList.add(U.maskName(cacheCtx.name()));
 
-                                if (cur != null)
                                     r = cur;
+                                }
                             }
                         }
 
-                        boolean rebNeed = !rebList.isEmpty();
-
-                        if (rebNeed) {
+                        if (assignsCancelled) { // Pending exchange.
+                            U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
+                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                        }
+                        else if (r != null) {
                             Collections.reverse(rebList);
 
                             U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
-                        }
-                        else
-                            U.log(log, "Skipping rebalancing (nothing scheduled) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
-                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
 
-                        if (futQ.isEmpty()) {
-                            if (rebNeed)
+                            if (futQ.isEmpty()) {
                                 U.log(log, "Rebalancing started " +
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
 
-                            r.run();
-                        }
-                        else {
-                            if (rebNeed)
+                                r.run(); // Starts rebalancing routine.
+                            }
+                            else
                                 U.log(log, "Skipping rebalancing (obsolete exchange ID) "
+
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
                         }
+                        else
+                            U.log(log, "Skipping rebalancing (nothing scheduled) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
+                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
                     }
                 }
                 catch (IgniteInterruptedCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/38df717f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index de472da..8144c8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -120,6 +121,11 @@ public class GridDhtPartitionDemander {
     /** Cached rebalance topics. */
     private final Map<Integer, Object> rebalanceTopics;
 
+    /** Stopped event sent.
+     * Make sense for replicated cache only.
+     */
+    private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
+
     /**
      * @param cctx Cctx.
      * @param demandLock Demand lock.
@@ -266,7 +272,7 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             final RebalanceFuture oldFut = rebalanceFut;
 
-            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(),
cnt);
+            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, stoppedEvtSent,
cnt);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
@@ -280,31 +286,40 @@ public class GridDhtPartitionDemander {
 
             rebalanceFut = fut;
 
-            if (assigns.isEmpty()) {
-                ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+            if (assigns.cancelled()) { // Pending exchange.
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing skipped due to cancelled assignments.");
+
+                rebalanceFut.onDone(false);
 
                 rebalanceFut.sendRebalanceFinishedEvent();
+
+                return null;
             }
 
-            return new Runnable() {
-                @Override public void run() {
-                    if (next != null)
-                        rebalanceFut.listen(new CI1<IgniteInternalFuture<Boolean>>()
{
-                            @Override public void apply(IgniteInternalFuture<Boolean>
fut) {
-                                next.run(); // Starts next cache preloading (according to
the order).
-                            }
-                        });
+            if (assigns.isEmpty()) { // Nothing to rebalance.
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing skipped due to empty assignments.");
 
-                    if (assigns.isEmpty()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Rebalancing skipped due to empty assignments.");
+                rebalanceFut.onDone(true);
+
+                ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
 
-                        rebalanceFut.onDone(true);
+                rebalanceFut.sendRebalanceFinishedEvent();
 
-                        return;
-                    }
+                return null;
+            }
 
+            return new Runnable() {
+                @Override public void run() {
                     try {
+                        if (next != null)
+                            rebalanceFut.listen(new CI1<IgniteInternalFuture<Boolean>>()
{
+                                @Override public void apply(IgniteInternalFuture<Boolean>
fut) {
+                                    next.run(); // Starts next cache rebalancing (according
to the order).
+                                }
+                            });
+
                         requestPartitions(rebalanceFut, assigns);
                     }
                     catch (ClusterTopologyCheckedException e) {
@@ -756,8 +771,10 @@ public class GridDhtPartitionDemander {
         /** */
         private static final long serialVersionUID = 1L;
 
-        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
-        private final boolean sndStoppedEvnt;
+        /**
+         * Should EVT_CACHE_REBALANCE_STOPPED event be sent of not.
+         */
+        private final AtomicBoolean stoppedEvtSent;
 
         /** */
         private final GridCacheContext<?, ?> cctx;
@@ -785,13 +802,13 @@ public class GridDhtPartitionDemander {
          * @param assigns Assigns.
          * @param cctx Context.
          * @param log Logger.
-         * @param sentStopEvnt Stop event flag.
+         * @param stoppedEvtSent Stop event flag.
          * @param updateSeq Update sequence.
          */
         RebalanceFuture(GridDhtPreloaderAssignments assigns,
             GridCacheContext<?, ?> cctx,
             IgniteLogger log,
-            boolean sentStopEvnt,
+            AtomicBoolean stoppedEvtSent,
             long updateSeq) {
             assert assigns != null;
 
@@ -799,7 +816,7 @@ public class GridDhtPartitionDemander {
             this.topVer = assigns.topologyVersion();
             this.cctx = cctx;
             this.log = log;
-            this.sndStoppedEvnt = sentStopEvnt;
+            this.stoppedEvtSent = stoppedEvtSent;
             this.updateSeq = updateSeq;
         }
 
@@ -811,7 +828,7 @@ public class GridDhtPartitionDemander {
             this.topVer = null;
             this.cctx = null;
             this.log = null;
-            this.sndStoppedEvnt = false;
+            this.stoppedEvtSent = null;
             this.updateSeq = -1;
         }
 
@@ -868,7 +885,7 @@ public class GridDhtPartitionDemander {
 
                 remaining.clear();
 
-                checkIsDone(true /* cancelled */, false);
+                checkIsDone(true /* cancelled */);
             }
 
             return true;
@@ -1004,22 +1021,20 @@ public class GridDhtPartitionDemander {
          *
          */
         private void checkIsDone() {
-            checkIsDone(false, false);
+            checkIsDone(false);
         }
 
         /**
          * @param cancelled Is cancelled.
-         * @param wasEmpty {@code True} if future was created without assignments.
          */
-        private void checkIsDone(boolean cancelled, boolean wasEmpty) {
+        private void checkIsDone(boolean cancelled) {
             if (remaining.isEmpty()) {
                 sendRebalanceFinishedEvent();
 
                 if (log.isDebugEnabled())
                     log.debug("Completed rebalance future: " + this);
 
-                if (!wasEmpty)
-                    cctx.shared().exchange().scheduleResendPartitions();
+                cctx.shared().exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();
 
@@ -1048,9 +1063,13 @@ public class GridDhtPartitionDemander {
         /**
          *
          */
-        private void sendRebalanceFinishedEvent(){
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated()
|| sndStoppedEvnt))
-                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());;
+        private void sendRebalanceFinishedEvent() {
+            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
+                (!cctx.isReplicated() || !stoppedEvtSent.get())) {
+                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
+                stoppedEvtSent.set(true);
+            }
         }
 
         /** {@inheritDoc} */


Mime
View raw message