ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 5578
Date Thu, 03 Aug 2017 16:39:54 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 51a95a14c -> 489d0aca0


5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/489d0aca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/489d0aca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/489d0aca

Branch: refs/heads/ignite-5578
Commit: 489d0aca0752ffe2e7c26cfb9a9389b3731c2cef
Parents: 51a95a1
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Aug 3 19:39:40 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Aug 3 19:39:40 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 28 +++++----
 .../GridDhtPartitionsExchangeFuture.java        | 11 ++++
 .../distributed/CacheExchangeMergeTest.java     | 66 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/489d0aca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index efb04a7..48909d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -128,7 +128,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.preloa
  */
 public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedManagerAdapter<K,
V> {
     /** Exchange history size. */
-    private static final int EXCHANGE_HISTORY_SIZE =
+    private final int EXCHANGE_HISTORY_SIZE =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE,
1_000);
 
     /** */
@@ -185,7 +185,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * discovery event. In case if remote node will retry partition exchange, completed future
will indicate
      * that full partition map should be sent to requesting node right away.
      */
-    private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
+    private ExchangeFutureSet exchFuts = new ExchangeFutureSet(EXCHANGE_HISTORY_SIZE);
 
     /** */
     private volatile IgniteCheckedException stopErr;
@@ -1816,8 +1816,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (pendingMsg != null)
                         curFut.waitAndReplyToNode(evt.eventNode().id(), pendingMsg);
                 }
-
-                exchWorker.futQ.remove(fut);
             }
         }
 
@@ -1927,8 +1925,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         if (fut.mergeJoinExchange(curFut))
                             awaited++;
                     }
-
-                    exchWorker.futQ.remove(fut);
                 }
                 else {
                     if (!task.skipForExchangeMerge()) {
@@ -2039,7 +2035,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
          * @param exchFut Exchange future.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private void onExchangeDone(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture
exchFut)
+        private void removeMergedFutures(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture
exchFut)
             throws IgniteInterruptedCheckedException {
             if (resVer.compareTo(exchFut.initialVersion()) != 0) {
                 waitForExchangeFuture(resVer);
@@ -2048,8 +2044,11 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                     if (task instanceof GridDhtPartitionsExchangeFuture) {
                         GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task;
 
-                        if (resVer.compareTo(fut0.initialVersion()) >= 0)
+                        if (resVer.compareTo(fut0.initialVersion()) >= 0) {
+                            fut0.finishMerged();
+
                             futQ.remove(fut0);
+                        }
                         else
                             break;
                     }
@@ -2262,7 +2261,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 }
                             }
 
-                            onExchangeDone(resVer, exchFut);
+                            removeMergedFutures(resVer, exchFut);
 
                             if (log.isDebugEnabled())
                                 log.debug("After waiting for exchange future [exchFut=" +
exchFut + ", worker=" +
@@ -2478,10 +2477,15 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** */
+        private final int histSize;
+
         /**
          * Creates ordered, not strict list set.
+         *
+         * @param histSize Max history size.
          */
-        private ExchangeFutureSet() {
+        private ExchangeFutureSet(int histSize) {
             super(new Comparator<GridDhtPartitionsExchangeFuture>() {
                 @Override public int compare(
                     GridDhtPartitionsExchangeFuture f1,
@@ -2497,6 +2501,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     return t2.compareTo(t1);
                 }
             }, /*not strict*/false);
+
+            this.histSize = histSize;
         }
 
         /**
@@ -2507,7 +2513,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             GridDhtPartitionsExchangeFuture fut) {
             GridDhtPartitionsExchangeFuture cur = super.addx(fut);
 
-            while (size() > EXCHANGE_HISTORY_SIZE) {
+            while (size() > histSize) {
                 GridDhtPartitionsExchangeFuture last = last();
 
                 if (!last.isDone())

http://git-wip-us.apache.org/repos/asf/ignite/blob/489d0aca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 528a85b..546b17b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1373,6 +1373,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         return done.get();
     }
 
+    /**
+     * Finish merged future to allow GridCachePartitionExchangeManager.ExchangeFutureSet
cleanup.
+     */
+    public void finishMerged() {
+        super.onDone(null, null);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable
err) {
         if (!done.compareAndSet(false, true))
@@ -1515,6 +1522,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         partReleaseFut = null;
         changeGlobalStateE = null;
         exchActions = null;
+        mergedJoinExchMsgs = null;
+        pendingJoinMsg = null;
+        exchCtx = null;
+        newCrdFut = null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/489d0aca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index ec41060..c64377c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -73,6 +74,7 @@ import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.eclipse.jetty.util.ConcurrentHashSet;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -539,6 +541,70 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testMergeAndHistoryCleanup() throws Exception {
+        final int histSize = 5;
+
+        String oldHistVal = System.getProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
+
+        System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, String.valueOf(histSize));
+
+        try {
+            final Ignite srv0 = startGrid(0);
+
+            int topVer = 1;
+
+            for (int i = 0; i < 3; i++) {
+                mergeExchangeWaitVersion(srv0, topVer + 3);
+
+                startGrids(srv0, topVer, 3).get();
+
+                topVer += 3;
+            }
+
+            checkHistorySize(histSize);
+
+            awaitPartitionMapExchange();
+
+            checkHistorySize(histSize);
+
+            mergeExchangeWaitVersion(srv0, topVer + 2);
+
+            stopGrid(1);
+            stopGrid(2);
+
+            checkHistorySize(histSize);
+
+            awaitPartitionMapExchange();
+
+            checkHistorySize(histSize);
+        }
+        finally {
+            if (oldHistVal != null)
+                System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, oldHistVal);
+            else
+                System.clearProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
+        }
+    }
+
+    /**
+     * @param histSize History size.
+     */
+    private void checkHistorySize(int histSize) {
+        List<Ignite> nodes = G.allGrids();
+
+        assertTrue(nodes.size() > 0);
+
+        for (Ignite node : nodes) {
+            List<GridDhtPartitionsExchangeFuture> exchFuts =
+                    ((IgniteEx)node).context().cache().context().exchange().exchangeFutures();
+
+            assertTrue("Unexpected size: " + exchFuts.size(), exchFuts.size() > 0 &&
exchFuts.size() <= histSize);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testStartCacheOnJoinAndFailMerge() throws Exception {
         cfgCache = false;
 


Mime
View raw message