ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [3/7] ignite git commit: IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way
Date Thu, 08 Dec 2016 09:46:04 GMT
IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ba1711a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ba1711a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ba1711a

Branch: refs/heads/master
Commit: 6ba1711a1fa10d8276974227491136070c3ed43a
Parents: acf20b3
Author: Anton Vinogradov <av@apache.org>
Authored: Tue Dec 6 12:55:41 2016 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Tue Dec 6 12:55:41 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 128 +++--------
 .../processors/cache/GridCachePreloader.java    |  11 +-
 .../cache/GridCachePreloaderAdapter.java        |   5 +-
 .../dht/preloader/GridDhtPartitionDemander.java | 230 +++++++++++--------
 .../dht/preloader/GridDhtPreloader.java         |   9 +-
 .../GridCacheRebalancingSyncSelfTest.java       |   2 +
 6 files changed, 183 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7a24aa1..f04a6ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,18 +21,18 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Queue;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -87,7 +87,6 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -97,13 +96,11 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -156,9 +153,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
-    /** */
-    private final Queue<Callable<Boolean>> rebalanceQ = new ConcurrentLinkedDeque8<>();
-
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address race conditions
when coordinator
@@ -1596,12 +1590,8 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException
{
             long timeout = cctx.gridConfig().getNetworkTimeout();
 
-            boolean startEvtFired = false;
-
             int cnt = 0;
 
-            IgniteInternalFuture asyncStartFut = null;
-
             while (!isCancelled()) {
                 GridDhtPartitionsExchangeFuture exchFut = null;
 
@@ -1703,20 +1693,8 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                     continue;
 
                                 changed |= cacheCtx.topology().afterExchange(exchFut);
-
-                                // Preload event notification.
-                                if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED))
{
-                                    if (!cacheCtx.isReplicated() || !startEvtFired) {
-                                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-                                        cacheCtx.events().addPreloadEvent(-1, EVT_CACHE_REBALANCE_STARTED,
-                                            discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
-                                    }
-                                }
                             }
 
-                            startEvtFired = true;
-
                             if (!cctx.kernalContext().clientNode() && changed &&
futQ.isEmpty())
                                 refreshPartitions();
                         }
@@ -1755,8 +1733,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (assignsMap != null) {
                         int size = assignsMap.size();
 
-                        rebalanceQ.clear();
-
                         NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
 
                         for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet())
{
@@ -1772,101 +1748,65 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                             orderMap.get(order).add(cacheId);
                         }
 
-                        Callable<Boolean> marshR = null;
-                        List<Callable<Boolean>> orderedRs = new ArrayList<>(size);
+                        Runnable r = null;
+
+                        List<String> rebList = new LinkedList<>();
+
+                        boolean assignsCancelled = false;
 
-                        //Ordered rebalance scheduling.
-                        for (Integer order : orderMap.keySet()) {
+                        for (Integer order : orderMap.descendingKeySet()) {
                             for (Integer cacheId : orderMap.get(order)) {
                                 GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
-                                List<String> waitList = new ArrayList<>(size
- 1);
+                                GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
 
-                                for (List<Integer> cIds : orderMap.headMap(order).values())
{
-                                    for (Integer cId : cIds)
-                                        waitList.add(cctx.cacheContext(cId).name());
-                                }
+                                if (assigns != null)
+                                    assignsCancelled |= assigns.cancelled();
 
-                                Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+                                // Cancels previous rebalance future (in case it's not done
yet).
+                                // Sends previous rebalance stopped event (if necessary).
+                                // Creates new rebalance future.
+                                // Sends current rebalance started event (if necessary).
+                                // Finishes cache sync future (on empty assignments).
+                                Runnable cur = cacheCtx.preloader().addAssignments(assigns,
                                     forcePreload,
-                                    waitList,
-                                    cnt);
+                                    cnt,
+                                    r);
 
-                                if (r != null) {
-                                    U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name()
+
-                                        ", waitList=" + waitList.toString() + "]");
+                                if (cur != null) {
+                                    rebList.add(U.maskName(cacheCtx.name()));
 
-                                    if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
-                                        marshR = r;
-                                    else
-                                        orderedRs.add(r);
+                                    r = cur;
                                 }
                             }
                         }
 
-                        if (asyncStartFut != null)
-                            asyncStartFut.get(); // Wait for thread stop.
+                        if (assignsCancelled) { // Pending exchange.
+                            U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
+                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                        }
+                        else if (r != null) {
+                            Collections.reverse(rebList);
 
-                        rebalanceQ.addAll(orderedRs);
+                            U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
 
-                        if (marshR != null || !rebalanceQ.isEmpty()) {
                             if (futQ.isEmpty()) {
-                                U.log(log, "Rebalancing required " +
+                                U.log(log, "Rebalancing started " +
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
 
-                                if (marshR != null) {
-                                    try {
-                                        marshR.call(); //Marshaller cache rebalancing launches
in sync way.
-                                    }
-                                    catch (Exception ex) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Failed to send initial demand request
to node");
-
-                                        continue;
-                                    }
-                                }
-
-                                final GridFutureAdapter fut = new GridFutureAdapter();
-
-                                asyncStartFut = fut;
-
-                                cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>()
{
-                                    @Override public Boolean call() {
-                                        try {
-                                            while (true) {
-                                                Callable<Boolean> r = rebalanceQ.poll();
-
-                                                if (r == null)
-                                                    return false;
-
-                                                if (!r.call())
-                                                    return false;
-                                            }
-                                        }
-                                        catch (Exception ex) {
-                                            if (log.isDebugEnabled())
-                                                log.debug("Failed to send initial demand
request to node");
-
-                                            return false;
-                                        }
-                                        finally {
-                                            fut.onDone();
-                                        }
-                                    }
-                                }, /*system pool*/true);
+                                r.run(); // Starts rebalancing routine.
                             }
-                            else {
+                            else
                                 U.log(log, "Skipping rebalancing (obsolete exchange ID) "
+
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
-                            }
                         }
-                        else {
+                        else
                             U.log(log, "Skipping rebalancing (nothing scheduled) " +
                                 "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                 ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-                        }
                     }
                 }
                 catch (IgniteInterruptedCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 1d1cfab..3c4456d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -84,14 +83,14 @@ public interface GridCachePreloader {
      *
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
-     * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
-     * @return Rebalancing closure.
+     * @param next Runnable responsible for cache rebalancing start.
+     * @return Rebalancing runnable.
      */
-    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+    public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
         boolean forcePreload,
-        Collection<String> caches,
-        int cnt);
+        int cnt,
+        Runnable next);
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b15ebc5..656a960 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -166,8 +165,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
-        Collection<String> caches, int cnt) {
+    @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean
forcePreload,
+        int cnt, Runnable next) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 57d5229..a6808c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -28,8 +28,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -72,6 +72,7 @@ import org.jetbrains.annotations.Nullable;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
@@ -122,6 +123,18 @@ public class GridDhtPartitionDemander {
     private final Map<Integer, Object> rebalanceTopics;
 
     /**
+     * Started event sent.
+     * Make sense for replicated cache only.
+     */
+    private final AtomicBoolean startedEvtSent = new AtomicBoolean();
+
+    /**
+     * Stopped event sent.
+     * Make sense for replicated cache only.
+     */
+    private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
+
+    /**
      * @param cctx Cctx.
      * @param demandLock Demand lock.
      */
@@ -249,45 +262,25 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param name Cache name.
-     * @param fut Future.
-     * @throws IgniteCheckedException If failed.
+     * Sets last exchange future.
+     *
+     * @param lastFut Last future to set.
      */
-    private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException
{
-        if (log.isDebugEnabled())
-            log.debug("Waiting for another cache to start rebalancing [cacheName=" + cctx.name()
+
-                ", waitCache=" + name + ']');
-
-        RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name)
-            .preloader().rebalanceFuture();
-
-        if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
-            if (!wFut.get()) {
-                U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion()
+
-                    "] (cache rebalanced with missed partitions)");
-
-                return false;
-            }
-
-            return true;
-        }
-        else {
-            U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion()
+
-                "] (topology already changed)");
-
-            return false;
-        }
+    void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
+        lastExchangeFut = lastFut;
     }
 
     /**
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
-     * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
-     * @return Rebalancing closure.
+     * @param next Runnable responsible for cache rebalancing start.
+     * @return Rebalancing runnable.
      */
-    Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean
force,
-        final Collection<String> caches, int cnt) {
+    Runnable addAssignments(final GridDhtPreloaderAssignments assigns,
+        boolean force,
+        int cnt,
+        final Runnable next) {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -296,7 +289,7 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             final RebalanceFuture oldFut = rebalanceFut;
 
-            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(),
cnt);
+            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent,
stoppedEvtSent, cnt);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
@@ -310,20 +303,69 @@ public class GridDhtPartitionDemander {
 
             rebalanceFut = fut;
 
-            if (assigns.isEmpty()) {
-                fut.doneIfEmpty(assigns.cancelled());
+            fut.sendRebalanceStartedEvent();
+
+            if (assigns.cancelled()) { // Pending exchange.
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing skipped due to cancelled assignments.");
+
+                fut.onDone(false);
+
+                fut.sendRebalanceFinishedEvent();
+
+                return null;
+            }
+
+            if (assigns.isEmpty()) { // Nothing to rebalance.
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing skipped due to empty assignments.");
+
+                fut.onDone(true);
+
+                ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+
+                fut.sendRebalanceFinishedEvent();
 
                 return null;
             }
 
-            return new Callable<Boolean>() {
-                @Override public Boolean call() throws Exception {
-                    for (String c : caches) {
-                        if (!waitForCacheRebalancing(c, fut))
-                            return false;
+            return new Runnable() {
+                @Override public void run() {
+                    try {
+                        if (next != null)
+                            fut.listen(new CI1<IgniteInternalFuture<Boolean>>()
{
+                                @Override public void apply(IgniteInternalFuture<Boolean>
f) {
+                                    try {
+                                        if (f.get()) // Not cancelled.
+                                            next.run(); // Starts next cache rebalancing
(according to the order).
+                                    }
+                                    catch (IgniteCheckedException ignored) {
+                                        if (log.isDebugEnabled())
+                                            log.debug(ignored.getMessage());
+                                    }
+                                }
+                            });
+
+                        requestPartitions(fut, assigns);
                     }
+                    catch (IgniteCheckedException e) {
+                        ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
 
-                    return requestPartitions(fut, assigns);
+                        if (cause != null)
+                            log.warning("Failed to send initial demand request to node. "
+ e.getMessage());
+                        else
+                            log.error("Failed to send initial demand request to node.", e);
+
+                        fut.cancel();
+                    }
+                    catch (Throwable th) {
+                        log.error("Runtime error caught during initial demand request sending.",
th);
+
+                        fut.cancel();
+
+                        if (th instanceof Error)
+                            throw th;
+                    }
                 }
             };
         }
@@ -361,14 +403,17 @@ public class GridDhtPartitionDemander {
      * @throws IgniteCheckedException If failed.
      * @return Partitions were requested.
      */
-    private boolean requestPartitions(
+    private void requestPartitions(
         RebalanceFuture fut,
         GridDhtPreloaderAssignments assigns
     ) throws IgniteCheckedException {
-        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
-            if (topologyChanged(fut))
-                return false;
+        if (topologyChanged(fut)) {
+            fut.cancel();
+
+            return;
+        }
 
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
             final ClusterNode node = e.getKey();
 
             GridDhtPartitionDemandMessage d = e.getValue();
@@ -387,7 +432,7 @@ public class GridDhtPartitionDemander {
 
             //Check remote node rebalancing API version.
             if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >=
0) {
-                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode()
+
+                U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
                     ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
                     ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq
+ "]");
 
@@ -446,8 +491,6 @@ public class GridDhtPartitionDemander {
                 worker.run(node, d);
             }
         }
-
-        return true;
     }
 
     /**
@@ -739,23 +782,17 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * Sets last exchange future.
-     *
-     * @param lastFut Last future to set.
-     */
-    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        lastExchangeFut = lastFut;
-    }
-
-    /**
      *
      */
     public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
         /** */
         private static final long serialVersionUID = 1L;
 
-        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
-        private final boolean sndStoppedEvnt;
+        /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
+        private final AtomicBoolean startedEvtSent;
+
+        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */
+        private final AtomicBoolean stoppedEvtSent;
 
         /** */
         private final GridCacheContext<?, ?> cctx;
@@ -783,13 +820,15 @@ public class GridDhtPartitionDemander {
          * @param assigns Assigns.
          * @param cctx Context.
          * @param log Logger.
-         * @param sentStopEvnt Stop event flag.
+         * @param startedEvtSent Start event sent flag.
+         * @param stoppedEvtSent Stop event sent flag.
          * @param updateSeq Update sequence.
          */
         RebalanceFuture(GridDhtPreloaderAssignments assigns,
             GridCacheContext<?, ?> cctx,
             IgniteLogger log,
-            boolean sentStopEvnt,
+            AtomicBoolean startedEvtSent,
+            AtomicBoolean stoppedEvtSent,
             long updateSeq) {
             assert assigns != null;
 
@@ -797,7 +836,8 @@ public class GridDhtPartitionDemander {
             this.topVer = assigns.topologyVersion();
             this.cctx = cctx;
             this.log = log;
-            this.sndStoppedEvnt = sentStopEvnt;
+            this.startedEvtSent = startedEvtSent;
+            this.stoppedEvtSent = stoppedEvtSent;
             this.updateSeq = updateSeq;
         }
 
@@ -809,7 +849,8 @@ public class GridDhtPartitionDemander {
             this.topVer = null;
             this.cctx = null;
             this.log = null;
-            this.sndStoppedEvnt = false;
+            this.startedEvtSent = null;
+            this.stoppedEvtSent = null;
             this.updateSeq = -1;
         }
 
@@ -848,24 +889,6 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @param cancelled Is cancelled.
-         */
-        private void doneIfEmpty(boolean cancelled) {
-            synchronized (this) {
-                if (isDone())
-                    return;
-
-                assert remaining.isEmpty();
-
-                if (log.isDebugEnabled())
-                    log.debug("Rebalancing is not required [cache=" + cctx.name() +
-                        ", topology=" + topVer + "]");
-
-                checkIsDone(cancelled, true);
-            }
-        }
-
-        /**
          * Cancels this future.
          *
          * @return {@code True}.
@@ -875,8 +898,7 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return true;
 
-                U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
-                    + ", topology=" + topologyVersion() + ']');
+                U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion()
+ ']');
 
                 if (!cctx.kernalContext().isStopping()) {
                     for (UUID nodeId : remaining.keySet())
@@ -885,7 +907,7 @@ public class GridDhtPartitionDemander {
 
                 remaining.clear();
 
-                checkIsDone(true /* cancelled */, false);
+                checkIsDone(true /* cancelled */);
             }
 
             return true;
@@ -907,7 +929,7 @@ public class GridDhtPartitionDemander {
 
                 remaining.remove(nodeId);
 
-                onDone(false); // Finishing rebalance future a non completed.
+                onDone(false); // Finishing rebalance future as non completed.
 
                 checkIsDone(); // But will finish syncFuture only when other nodes are preloaded
or rebalancing cancelled.
             }
@@ -988,8 +1010,7 @@ public class GridDhtPartitionDemander {
 
                 if (parts.isEmpty()) {
                     U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "")
+
-                        "rebalancing [cache=" + cctx.name() +
-                        ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+                        "rebalancing [fromNode=" + nodeId + ", topology=" + topologyVersion()
+
                         ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]"));
 
                     remaining.remove(nodeId);
@@ -1022,23 +1043,20 @@ public class GridDhtPartitionDemander {
          *
          */
         private void checkIsDone() {
-            checkIsDone(false, false);
+            checkIsDone(false);
         }
 
         /**
          * @param cancelled Is cancelled.
-         * @param wasEmpty {@code True} if future was created without assignments.
          */
-        private void checkIsDone(boolean cancelled, boolean wasEmpty) {
+        private void checkIsDone(boolean cancelled) {
             if (remaining.isEmpty()) {
-                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated()
|| sndStoppedEvnt))
-                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+                sendRebalanceFinishedEvent();
 
                 if (log.isDebugEnabled())
                     log.debug("Completed rebalance future: " + this);
 
-                if (!wasEmpty)
-                    cctx.shared().exchange().scheduleResendPartitions();
+                cctx.shared().exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();
 
@@ -1064,6 +1082,30 @@ public class GridDhtPartitionDemander {
             }
         }
 
+        /**
+         *
+         */
+        private void sendRebalanceStartedEvent() {
+            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
+                (!cctx.isReplicated() || !startedEvtSent.get())) {
+                preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
+
+                startedEvtSent.set(true);
+            }
+        }
+
+        /**
+         *
+         */
+        private void sendRebalanceFinishedEvent() {
+            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
+                (!cctx.isReplicated() || !stoppedEvtSent.get())) {
+                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
+                stoppedEvtSent.set(true);
+            }
+        }
+
         /** {@inheritDoc} */
         public String toString() {
             return S.toString(RebalanceFuture.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0865d9f..692e7c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -255,7 +254,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
         supplier.onTopologyChanged(lastFut.topologyVersion());
 
-        demander.updateLastExchangeFuture(lastFut);
+        demander.onTopologyChanged(lastFut);
     }
 
     /** {@inheritDoc} */
@@ -413,9 +412,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
-        boolean forcePreload, Collection<String> caches, int cnt) {
-        return demander.addAssignments(assignments, forcePreload, caches, cnt);
+    @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+        boolean forcePreload, int cnt, Runnable next) {
+        return demander.addAssignments(assignments, forcePreload, cnt, next);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index de38952..3dfcd85 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -501,6 +501,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         record = true;
 
+        log.info("Checking GridDhtPartitions*Message absent (it will take 30 SECONDS) ...
");
+
         U.sleep(30_000);
 
         record = false;


Mime
View raw message