ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: 5578
Date Wed, 02 Aug 2017 20:28:44 GMT
5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/354b8682
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/354b8682
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/354b8682

Branch: refs/heads/ignite-5578
Commit: 354b8682d97b04c642abd377afd6591727ebb12b
Parents: de050c7
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Aug 2 23:28:30 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Aug 2 23:28:30 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 45 ++++++++++++++++++--
 .../GridDhtPartitionsExchangeFuture.java        |  6 ++-
 2 files changed, 46 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/354b8682/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c8235ac..e4508ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -641,6 +641,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
+        exchWorker.onKernalStop();
+
         cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
 
         cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleMessage.class);
@@ -1767,11 +1769,18 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
         this.exchMergeTestWaitVer = exchMergeTestWaitVer;
     }
 
-    public void mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, GridDhtPartitionsFullMessage
msg)
+    /**
+     * @param curFut Current exchange future.
+     * @param msg Message.
+     * @return {@code True} if node is stopping.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public boolean mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, GridDhtPartitionsFullMessage
msg)
         throws IgniteInterruptedCheckedException {
         AffinityTopologyVersion resVer = msg.resultTopologyVersion();
 
-        exchWorker.waitForExchangeFuture(resVer);
+        if (exchWorker.waitForExchangeFuture(resVer))
+            return true;
 
         for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
             if (task instanceof GridDhtPartitionsExchangeFuture) {
@@ -1811,6 +1820,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         assert evts.topologyVersion().equals(resVer) : "Invalid exchange merge result [ver="
+ evts.topologyVersion()
             + ", expVer=" + resVer + ']';
+
+        return false;
     }
 
     /**
@@ -1944,6 +1955,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         /** */
         private boolean crd;
 
+        /** */
+        private boolean stop;
+
         /**
          * Constructor.
          */
@@ -1989,13 +2003,36 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                 log.debug("Added exchange future to exchange worker: " + exchFut);
         }
 
-        private void waitForExchangeFuture(AffinityTopologyVersion resVer) throws IgniteInterruptedCheckedException
{
+        /**
+         *
+         */
+        private void onKernalStop() {
+            synchronized (this) {
+                stop = true;
+
+                notifyAll();
+            }
+        }
+
+        /**
+         * @param resVer Version to wait for.
+         * @return {@code True} if node is stopping.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        private boolean waitForExchangeFuture(AffinityTopologyVersion resVer) throws IgniteInterruptedCheckedException
{
             synchronized (this) {
-                while (lastFutVer.compareTo(resVer) < 0)
+                while (!stop && lastFutVer.compareTo(resVer) < 0)
                     U.wait(this);
+
+                return stop;
             }
         }
 
+        /**
+         * @param resVer Exchange result version.
+         * @param exchFut Exchange future.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
         private void onExchangeDone(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture
exchFut)
             throws IgniteInterruptedCheckedException {
             if (resVer.compareTo(exchFut.exchangeId().topologyVersion()) != 0) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/354b8682/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6f77f96..5191557 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2645,7 +2645,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                     resTopVer = msg.resultTopologyVersion();
 
-                    cctx.exchange().mergeExchanges(this, msg);
+                    if (cctx.exchange().mergeExchanges(this, msg)) {
+                        assert cctx.kernalContext().isStopping();
+
+                        return; // Node is stopping, no need to further process exchange.
+                    }
 
                     assert resTopVer.equals(exchCtx.events().topologyVersion()) :  "Unexpected
result version [" +
                         "msgVer=" + resTopVer +


Mime
View raw message