ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5578
Date Wed, 02 Aug 2017 15:48:53 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 3dc2a17d6 -> 39ec42c7e


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39ec42c7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39ec42c7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39ec42c7

Branch: refs/heads/ignite-5578
Commit: 39ec42c7e42ff504c2dccc291bec1a7fb6d54974
Parents: 3dc2a17
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Aug 2 18:22:05 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Aug 2 18:48:47 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 12 +++--
 .../GridDhtPartitionsExchangeFuture.java        | 48 +++++++++++---------
 .../distributed/CacheExchangeMergeTest.java     |  6 +--
 3 files changed, 38 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/39ec42c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index b28cd95..a785477 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1239,13 +1239,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         return grpHolder.affinity();
     }
 
-    public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, final
GridDhtPartitionsFullMessage msg) {
+    /**
+     * @param fut Current exchange future.
+     * @param msg Finish exchange message.
+     */
+    public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut,
+        final GridDhtPartitionsFullMessage msg) {
         final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
-        log.info("mergeExchangesOnServerLeft [topVer=" + fut.context().events().discoveryCache().version()
+ ']');
-
         forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>()
{
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException
{
                 ExchangeDiscoveryEvents evts = fut.context().events();
@@ -2045,7 +2048,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param topVer Topology version.
      * @param fut Exchange future.
+     * @param c Closure converting affinity diff.
+     * @param initAff {@code True} if need initialize affinity.
      * @return Affinity assignment.
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/39ec42c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1a5a8e2..3579f3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -130,6 +130,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private final Set<UUID> remaining = new HashSet<>();
 
+    /** */
+    @GridToStringExclude
+    private final Object mux = new Object();
+
     /** Guarded by this */
     @GridToStringExclude
     private int pendingSingleUpdates;
@@ -805,7 +809,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 changeGlobalStateE = e;
 
                 if (crd) {
-                    synchronized (this) {
+                    synchronized (mux) {
                         changeGlobalStateExceptions.put(cctx.localNodeId(), e);
                     }
                 }
@@ -1567,7 +1571,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     public boolean mergeJoinExchange(GridDhtPartitionsExchangeFuture fut) {
         boolean wait;
 
-        synchronized (this) {
+        synchronized (mux) {
             assert (!isDone() && !initFut.isDone()) || cctx.kernalContext().isStopping()
: this;
             assert (mergedWith == null && state == null) || cctx.kernalContext().isStopping()
 : this;
 
@@ -1588,7 +1592,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return Pending join request if any.
      */
     @Nullable public GridDhtPartitionsSingleMessage mergeJoinExchangeOnDone(GridDhtPartitionsExchangeFuture
fut) {
-        synchronized (this) {
+        synchronized (mux) {
             assert !isDone();
             assert !initFut.isDone();
             assert mergedWith == null;
@@ -1617,7 +1621,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         FinishState finishState0 = null;
 
-        synchronized (this) {
+        synchronized (mux) {
             if (state == ExchangeLocalState.DONE) {
                 assert finishState != null;
 
@@ -1672,7 +1676,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (msg.restoreState()) {
             InitNewCoordinatorFuture newCrdFut0;
 
-            synchronized (this) {
+            synchronized (mux) {
                 assert newCrdFut != null;
 
                 newCrdFut0 = newCrdFut;
@@ -1691,7 +1695,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         GridDhtPartitionsExchangeFuture mergedWith0 = null;
 
-        synchronized (this) {
+        synchronized (mux) {
             if (state == ExchangeLocalState.MERGED) {
                 assert mergedWith != null;
 
@@ -1733,7 +1737,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
                 FinishState finishState0;
 
-                synchronized (GridDhtPartitionsExchangeFuture.this) {
+                synchronized (mux) {
                     finishState0 = finishState;
                 }
 
@@ -1758,7 +1762,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 FinishState finishState0;
 
-                synchronized (GridDhtPartitionsExchangeFuture.this) {
+                synchronized (mux) {
                     finishState0 = finishState;
                 }
 
@@ -1794,7 +1798,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         FinishState finishState0 = null;
 
-        synchronized (this) {
+        synchronized (mux) {
             assert crd != null;
 
             switch (state) {
@@ -1858,7 +1862,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     updatePartitionSingleMap(nodeId, msg);
             }
             finally {
-                synchronized (this) {
+                synchronized (mux) {
                     assert pendingSingleUpdates > 0;
 
                     pendingSingleUpdates--;
@@ -2154,7 +2158,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null;
 
             if (exchCtx.mergeExchanges()) {
-                synchronized (this) {
+                synchronized (mux) {
                     if (mergedJoinExchMsgs != null) {
                         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet())
{
                             msgs.put(e.getKey(), e.getValue());
@@ -2256,7 +2260,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             msg.prepareMarshal(cctx);
 
-            synchronized (this) {
+            synchronized (mux) {
                 finishState = new FinishState(crd.id(), resTopVer, msg);
 
                 state = ExchangeLocalState.DONE;
@@ -2282,7 +2286,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs0;
 
-                synchronized (this) {
+                synchronized (mux) {
                     srvNodes.remove(cctx.localNode());
 
                     nodes = U.newHashSet(srvNodes.size());
@@ -2461,7 +2465,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest
msg) {
         FinishState finishState0 = null;
 
-        synchronized (this) {
+        synchronized (mux) {
             if (crd == null) {
                 log.info("Ignore partitions request, no coordinator [node=" + node.id() +
']');
 
@@ -2577,7 +2581,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (checkCrd) {
                 assert node != null;
 
-                synchronized (this) {
+                synchronized (mux) {
                     if (crd == null) {
                         log.info("Ignore full message, all server nodes left: " + msg);
 
@@ -2881,14 +2885,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                         InitNewCoordinatorFuture newCrdFut0;
 
-                        synchronized (this) {
+                        synchronized (mux) {
                             newCrdFut0 = newCrdFut;
                         }
 
                         if (newCrdFut0 != null)
                             newCrdFut0.onNodeLeft(node.id());
 
-                        synchronized (this) {
+                        synchronized (mux) {
                             if (!srvNodes.remove(node))
                                 return;
 
@@ -3046,7 +3050,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 log.info("New coordinator restored state [ver=" + initialVersion() +
                     ", resVer=" + fullMsg.resultTopologyVersion() + ']');
 
-                synchronized (this) {
+                synchronized (mux) {
                     state = ExchangeLocalState.DONE;
 
                     finishState = new FinishState(crd.id(), fullMsg.resultTopologyVersion(),
fullMsg);
@@ -3097,7 +3101,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             allRcvd = true;
 
-            synchronized (this) {
+            synchronized (mux) {
                 remaining.clear(); // Do not process messages.
 
                 assert crd != null && crd.isLocal();
@@ -3110,7 +3114,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         else {
             Set<UUID> remaining0 = null;
 
-            synchronized (this) {
+            synchronized (mux) {
                 assert crd != null && crd.isLocal();
 
                 state = ExchangeLocalState.CRD;
@@ -3215,7 +3219,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             ClusterNode crd;
             Set<UUID> remaining;
 
-            synchronized (this) {
+            synchronized (mux) {
                 crd = this.crd;
                 remaining = new HashSet<>(this.remaining);
             }
@@ -3249,7 +3253,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @Override public String toString() {
         Set<UUID> remaining;
 
-        synchronized (this) {
+        synchronized (mux) {
             remaining = new HashSet<>(this.remaining);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/39ec42c7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index ac9390e..b32164f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -1148,7 +1148,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
                             ", cache=" + cacheName + ']';
 
                     for (int i = 0; i < 5; i++) {
-                        Integer key = rnd.nextInt(100_000);
+                        Integer key = rnd.nextInt(20_000);
 
                         cache.put(key, i);
 
@@ -1161,7 +1161,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
                         Map<Integer, Integer> map = new TreeMap<>();
 
                         for (int j = 0; j < 10; j++) {
-                            Integer key = rnd.nextInt(100_000);
+                            Integer key = rnd.nextInt(20_000);
 
                             map.put(key, i);
                         }
@@ -1204,7 +1204,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         try (Transaction tx = node.transactions().txStart(concurrency, isolation)) {
             for (int i = 0; i < 5; i++) {
-                Integer key = rnd.nextInt(100_000);
+                Integer key = rnd.nextInt(20_000);
 
                 cache.put(key, i);
 


Mime
View raw message