ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [15/15] ignite git commit: IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way
Date Mon, 21 Nov 2016 13:43:03 GMT
IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5536adfd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5536adfd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5536adfd

Branch: refs/heads/ignite-4242
Commit: 5536adfd8d816407bf1c3e0ffa7c7bb40dd2b858
Parents: 88f38ac
Author: Anton Vinogradov <av@apache.org>
Authored: Mon Nov 21 16:42:15 2016 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Mon Nov 21 16:42:15 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 107 +++++--------------
 .../processors/cache/GridCachePreloader.java    |  10 +-
 .../cache/GridCachePreloaderAdapter.java        |   5 +-
 .../dht/preloader/GridDhtPartitionDemander.java |  92 +++++++---------
 .../dht/preloader/GridDhtPreloader.java         |   7 +-
 .../GridCacheRebalancingSyncSelfTest.java       |   2 +
 6 files changed, 81 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5536adfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4eb61e3..eb9f1f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,18 +21,19 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Queue;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -81,7 +82,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -91,7 +91,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
@@ -150,9 +149,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
-    /** */
-    private final Queue<Callable<Boolean>> rebalanceQ = new ConcurrentLinkedDeque8<>();
-
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address race conditions
when coordinator
@@ -1391,7 +1387,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             int cnt = 0;
 
-            IgniteInternalFuture asyncStartFut = null;
+            Set<IgniteInternalFuture<Boolean>> rebFuts = null;
 
             while (!isCancelled()) {
                 GridDhtPartitionsExchangeFuture exchFut = null;
@@ -1546,8 +1542,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (assignsMap != null) {
                         int size = assignsMap.size();
 
-                        rebalanceQ.clear();
-
                         NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
 
                         for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet())
{
@@ -1557,95 +1551,52 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
 
                             int order = cacheCtx.config().getRebalanceOrder();
 
-                            if (orderMap.get(order) == null)
-                                orderMap.put(order, new ArrayList<Integer>(size));
+                            orderMap.putIfAbsent(order, new ArrayList<>(size));
 
                             orderMap.get(order).add(cacheId);
                         }
 
-                        Callable<Boolean> marshR = null;
-                        List<Callable<Boolean>> orderedRs = new ArrayList<>(size);
+                        Set<IgniteInternalFuture<Boolean>> prevRebFuts = rebFuts;
 
-                        //Ordered rebalance scheduling.
-                        for (Integer order : orderMap.keySet()) {
-                            for (Integer cacheId : orderMap.get(order)) {
-                                GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+                        rebFuts = new HashSet<>();
 
-                                List<String> waitList = new ArrayList<>(size
- 1);
+                        Runnable r = null;
 
-                                for (List<Integer> cIds : orderMap.headMap(order).values())
{
-                                    for (Integer cId : cIds)
-                                        waitList.add(cctx.cacheContext(cId).name());
-                                }
+                        List<String> rebList = new LinkedList<>();
+
+                        for (Integer order : orderMap.descendingKeySet()) {
+                            for (Integer cacheId : orderMap.get(order)) {
+                                GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
-                                Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+                                Runnable cur = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
                                     forcePreload,
-                                    waitList,
-                                    cnt);
+                                    cnt,
+                                    r);
 
-                                if (r != null) {
-                                    U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name()
+
-                                        ", waitList=" + waitList.toString() + "]");
+                                if (cur != null)
+                                    rebList.add(cacheCtx.name());
 
-                                    if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
-                                        marshR = r;
-                                    else
-                                        orderedRs.add(r);
-                                }
+                                rebFuts.add(cacheCtx.preloader().rebalanceFuture());
+
+                                r = cur;
                             }
                         }
 
-                        if (asyncStartFut != null)
-                            asyncStartFut.get(); // Wait for thread stop.
+                        if (prevRebFuts != null) // Wait for previous rebalancing (it's finished
or cancelled).
+                            for (IgniteInternalFuture<Boolean> fut : prevRebFuts)
+                                fut.get();
 
-                        rebalanceQ.addAll(orderedRs);
+                        if (r != null) {
+                            Collections.reverse(rebList);
+
+                            U.log(log, "Cache rebalancing scheduled: [order=" + rebList +
"]");
 
-                        if (marshR != null || !rebalanceQ.isEmpty()) {
                             if (futQ.isEmpty()) {
                                 U.log(log, "Rebalancing required " +
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
 
-                                if (marshR != null) {
-                                    try {
-                                        marshR.call(); //Marshaller cache rebalancing launches
in sync way.
-                                    }
-                                    catch (Exception ex) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Failed to send initial demand request
to node");
-
-                                        continue;
-                                    }
-                                }
-
-                                final GridFutureAdapter fut = new GridFutureAdapter();
-
-                                asyncStartFut = fut;
-
-                                cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>()
{
-                                    @Override public Boolean call() {
-                                        try {
-                                            while (true) {
-                                                Callable<Boolean> r = rebalanceQ.poll();
-
-                                                if (r == null)
-                                                    return false;
-
-                                                if (!r.call())
-                                                    return false;
-                                            }
-                                        }
-                                        catch (Exception ex) {
-                                            if (log.isDebugEnabled())
-                                                log.debug("Failed to send initial demand
request to node");
-
-                                            return false;
-                                        }
-                                        finally {
-                                            fut.onDone();
-                                        }
-                                    }
-                                }, /*system pool*/true);
+                                r.run(); // Starts rebalancing process.
                             }
                             else {
                                 U.log(log, "Skipping rebalancing (obsolete exchange ID) "
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/5536adfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 1d1cfab..874af1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -84,14 +84,14 @@ public interface GridCachePreloader {
      *
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
-     * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
-     * @return Rebalancing closure.
+     * @param next Runnable responsible for cache rebalancing start.
+     * @return Rebalancing runnable.
      */
-    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+    public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
         boolean forcePreload,
-        Collection<String> caches,
-        int cnt);
+        int cnt,
+        Runnable next);
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5536adfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b15ebc5..656a960 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -166,8 +165,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
-        Collection<String> caches, int cnt) {
+    @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean
forcePreload,
+        int cnt, Runnable next) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5536adfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 57d5229..999f99c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -249,45 +248,16 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param name Cache name.
-     * @param fut Future.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException
{
-        if (log.isDebugEnabled())
-            log.debug("Waiting for another cache to start rebalancing [cacheName=" + cctx.name()
+
-                ", waitCache=" + name + ']');
-
-        RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name)
-            .preloader().rebalanceFuture();
-
-        if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
-            if (!wFut.get()) {
-                U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion()
+
-                    "] (cache rebalanced with missed partitions)");
-
-                return false;
-            }
-
-            return true;
-        }
-        else {
-            U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion()
+
-                "] (topology already changed)");
-
-            return false;
-        }
-    }
-
-    /**
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
-     * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
-     * @return Rebalancing closure.
+     * @param next Runnable responsible for cache rebalancing start.
+     * @return Rebalancing runnable.
      */
-    Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean
force,
-        final Collection<String> caches, int cnt) {
+    Runnable addAssignments(final GridDhtPreloaderAssignments assigns,
+        boolean force,
+        int cnt,
+        final Runnable next) {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -310,20 +280,31 @@ public class GridDhtPartitionDemander {
 
             rebalanceFut = fut;
 
-            if (assigns.isEmpty()) {
-                fut.doneIfEmpty(assigns.cancelled());
+            if (next != null)
+                rebalanceFut.listen(new CI1<IgniteInternalFuture<Boolean>>()
{
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut)
{
+                        next.run();
+                    }
+                });
 
-                return null;
-            }
+            return new Runnable() {
+                @Override public void run() {
+                    try {
+                        requestPartitions(fut, assigns);
+                    }
+                    catch (IgniteCheckedException e) {
+                        log.error("Failed to send initial demand request to node.", e);
 
-            return new Callable<Boolean>() {
-                @Override public Boolean call() throws Exception {
-                    for (String c : caches) {
-                        if (!waitForCacheRebalancing(c, fut))
-                            return false;
+                        fut.cancel();
                     }
+                    catch (Throwable th) {
+                        log.error("Runtime error caught during initial demand request sending.",
th);
+
+                        fut.cancel();
 
-                    return requestPartitions(fut, assigns);
+                        if (th instanceof Error)
+                            throw th;
+                    }
                 }
             };
         }
@@ -361,14 +342,23 @@ public class GridDhtPartitionDemander {
      * @throws IgniteCheckedException If failed.
      * @return Partitions were requested.
      */
-    private boolean requestPartitions(
+    private void requestPartitions(
         RebalanceFuture fut,
         GridDhtPreloaderAssignments assigns
     ) throws IgniteCheckedException {
-        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
-            if (topologyChanged(fut))
-                return false;
+        if (assigns.isEmpty()) {
+            fut.doneIfEmpty(assigns.cancelled());
+
+            return;
+        }
+
+        if (topologyChanged(fut)) {
+            fut.cancel();
 
+            return;
+        }
+
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
             final ClusterNode node = e.getKey();
 
             GridDhtPartitionDemandMessage d = e.getValue();
@@ -446,8 +436,6 @@ public class GridDhtPartitionDemander {
                 worker.run(node, d);
             }
         }
-
-        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5536adfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a9245f0..663ad8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -412,9 +411,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
-        boolean forcePreload, Collection<String> caches, int cnt) {
-        return demander.addAssignments(assignments, forcePreload, caches, cnt);
+    @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+        boolean forcePreload, int cnt, Runnable next) {
+        return demander.addAssignments(assignments, forcePreload, cnt, next);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5536adfd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 5716d59..0edb0f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -501,6 +501,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         record = true;
 
+        log.info("Checking GridDhtPartitions*Message absent (it will take 30 SECONDS) ...
");
+
         U.sleep(30_000);
 
         record = false;


Mime
View raw message