ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1027 Fixed early rebalance sync future completion.
Date Wed, 02 Dec 2015 10:24:46 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1027 13276ac2e -> 82947c75e


ignite-1027 Fixed early rebalance sync future completion.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/82947c75
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/82947c75
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/82947c75

Branch: refs/heads/ignite-1027
Commit: 82947c75e305adebe5a6bfa33bb6d98872f41e06
Parents: 13276ac
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 2 13:16:23 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 2 13:16:23 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 35 +++++++++-----------
 .../dht/preloader/GridDhtPartitionDemander.java | 12 +++----
 .../dht/preloader/GridDhtPreloader.java         |  4 ++-
 .../preloader/GridDhtPreloaderAssignments.java  | 19 ++++++++++-
 4 files changed, 41 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/82947c75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 17abace..9142480 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1353,8 +1353,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 if (delay == 0 || forcePreload) {
                                     GridDhtPreloaderAssignments assigns = cacheCtx.preloader().assign(exchFut);
 
-                                    if (assigns != null)
-                                        assignsMap.put(cacheCtx.cacheId(), assigns);
+                                    assignsMap.put(cacheCtx.cacheId(), assigns);
                                 }
                             }
                         }
@@ -1399,27 +1398,26 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                         waitList.add(cctx.cacheContext(cId).name());
                                 }
 
-                                GridDhtPreloaderAssignments assignments = assignsMap.get(cacheId);
+                                Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+                                    forcePreload,
+                                    waitList,
+                                    cnt);
 
-                                if (assignments != null) {
-                                    Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignments,
-                                        forcePreload,
-                                        waitList,
-                                        cnt);
+                                if (r != null) {
+                                    U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name()
+
+                                        ", waitList=" + waitList.toString() + "]");
 
-                                    if (r != null) {
-                                        U.log(log, "Cache rebalancing scheduled: [cache="
+ cacheCtx.name() +
-                                            ", waitList=" + waitList.toString() + "]");
-
-                                        if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
-                                            marshR = r;
-                                        else
-                                            orderedRs.add(r);
-                                    }
+                                    if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
+                                        marshR = r;
+                                    else
+                                        orderedRs.add(r);
                                 }
                             }
                         }
 
+                        if (asyncStartFut != null)
+                            asyncStartFut.get(); // Wait for thread stop.
+
                         rebalanceQ.addAll(orderedRs);
 
                         if (marshR != null || !rebalanceQ.isEmpty()) {
@@ -1440,9 +1438,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                     }
                                 }
 
-                                if (asyncStartFut != null)
-                                    asyncStartFut.get(); // Wait for thread stop.
-
                                 final GridFutureAdapter fut = new GridFutureAdapter();
 
                                 asyncStartFut = fut;

http://git-wip-us.apache.org/repos/asf/ignite/blob/82947c75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 6b923d0..b46979db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -294,8 +294,6 @@ public class GridDhtPartitionDemander {
         long delay = cctx.config().getRebalanceDelay();
 
         if (delay == 0 || force) {
-            assert assigns != null;
-
             final RebalanceFuture oldFut = rebalanceFut;
 
             final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(),
cnt);
@@ -313,7 +311,7 @@ public class GridDhtPartitionDemander {
             rebalanceFut = fut;
 
             if (assigns.isEmpty()) {
-                fut.doneIfEmpty();
+                fut.doneIfEmpty(assigns.cancelled());
 
                 return null;
             }
@@ -841,9 +839,9 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         *
+         * @param cancelled Is cancelled.
          */
-        private void doneIfEmpty() {
+        private void doneIfEmpty(boolean cancelled) {
             synchronized (this) {
                 if (isDone())
                     return;
@@ -854,14 +852,14 @@ public class GridDhtPartitionDemander {
                     log.debug("Rebalancing is not required [cache=" + cctx.name() +
                         ", topology=" + topVer + "]");
 
-                checkIsDone();
+                checkIsDone(cancelled);
             }
         }
 
         /**
          * Cancels this future.
          *
-         * @return {@code true}.
+         * @return {@code True}.
          */
         @Override public boolean cancel() {
             synchronized (this) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/82947c75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 14734d5..9a6246f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -324,7 +324,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     log.debug("Skipping assignments creation, exchange worker has pending
assignments: " +
                         exchFut.exchangeId());
 
-                return null;
+                assigns.cancelled(true);
+
+                return assigns;
             }
 
             // If partition belongs to local node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/82947c75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 3583967..3f82c9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -37,19 +37,36 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode,
     /** Last join order. */
     private final AffinityTopologyVersion topVer;
 
+    /** */
+    private boolean cancelled;
+
     /**
      * @param exchFut Exchange future.
      * @param topVer Last join order.
      */
     public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion
topVer) {
         assert exchFut != null;
-        assert topVer.topologyVersion() > 0;
+        assert topVer.topologyVersion() > 0 : topVer;
 
         this.exchFut = exchFut;
         this.topVer = topVer;
     }
 
     /**
+     * @return {@code True} if assignments creation was cancelled.
+     */
+    public boolean cancelled() {
+        return cancelled;
+    }
+
+    /**
+     * @param cancelled {@code True} if assignments creation was cancelled.
+     */
+    public void cancelled(boolean cancelled) {
+        this.cancelled = cancelled;
+    }
+
+    /**
      * @return Exchange future.
      */
     GridDhtPartitionsExchangeFuture exchangeFuture() {


Mime
View raw message