ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 5578
Date Mon, 31 Jul 2017 14:46:29 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 d290f6bf4 -> 4f788be45


5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f788be4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f788be4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f788be4

Branch: refs/heads/ignite-5578
Commit: 4f788be45209e6ee928bf105e1d1e6bde6d70bb4
Parents: d290f6b
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jul 31 14:41:42 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jul 31 17:00:22 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   5 +
 .../GridDhtPartitionsExchangeFuture.java        |  33 ++--
 .../dht/preloader/InitNewCoordinatorFuture.java | 165 +++++++++++--------
 .../distributed/CacheExchangeMergeTest.java     |  41 ++++-
 4 files changed, 150 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4f788be4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4437054..06ce4e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -659,6 +659,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         // Finish all exchange futures.
         ExchangeFutureSet exchFuts0 = exchFuts;
 
+        for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+            if (task instanceof GridDhtPartitionsExchangeFuture)
+                ((GridDhtPartitionsExchangeFuture)task).onDone(stopErr);
+        }
+
         if (exchFuts0 != null) {
             for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
                 f.onDone(stopErr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f788be4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c3b5d31..87aee9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1499,6 +1499,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         assert node != null;
         assert state == ExchangeLocalState.CRD : state;
 
+        if (msg == null && newCrdFut != null)
+            msg = newCrdFut.joinExchangeMessage(node.id());
+
         UUID nodeId = node.id();
 
         boolean wait = false;
@@ -1512,6 +1515,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 mergedJoinExchMsgs = new LinkedHashMap<>();
 
             if (msg != null) {
+                assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order()));
+
                 log.info("Merge server join exchange, message received [curFut=" + initialVersion()
+
                     ", node=" + nodeId + ']');
 
@@ -3033,21 +3038,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (newCrdFut.restoreState()) {
             GridDhtPartitionsFullMessage fullMsg = newCrdFut.fullMessage();
 
-            boolean process = fullMsg == null;
-
             assert msgs.isEmpty() : msgs;
 
-            for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : newCrdFut.messages().entrySet())
{
-                GridDhtPartitionsSingleMessage msg = e.getValue();
-
-                if (!msg.client()) {
-                    msgs.put(e.getKey().id(), e.getValue());
-
-                    if (process)
-                        updatePartitionSingleMap(e.getKey().id(), msg);
-                }
-            }
-
             if (fullMsg != null) {
                 log.info("New coordinator restored state [ver=" + initialVersion() +
                     ", resVer=" + fullMsg.resultTopologyVersion() + ']');
@@ -3080,14 +3072,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         }
                     }
 
-                    sendAllPartitions(fullMsg, msgs.keySet(), newCrdFut.mergedJoinExchangeMessages(),
joinedNodeAff);
+                    sendAllPartitions(fullMsg, msgs.keySet(), null, joinedNodeAff);
                 }
 
                 return;
             }
-            else
+            else {
                 log.info("New coordinator restore state finished [ver=" + initialVersion()
+ ']');
 
+                for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : newCrdFut.messages().entrySet())
{
+                    GridDhtPartitionsSingleMessage msg = e.getValue();
+
+                    if (!msg.client()) {
+                        msgs.put(e.getKey().id(), e.getValue());
+
+                        updatePartitionSingleMap(e.getKey().id(), msg);
+                    }
+                }
+            }
+
             allRcvd = true;
 
             synchronized (this) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f788be4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 29c26f7..015f8ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -36,8 +37,10 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.*;
 
 /**
  *
@@ -53,7 +56,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
     private Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = new HashMap<>();
 
     /** */
-    private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
+    private Map<UUID, GridDhtPartitionsSingleMessage> joinExchMsgs;
 
     /** */
     private GridFutureAdapter restoreStateFut;
@@ -65,20 +68,15 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
     private AffinityTopologyVersion initTopVer;
 
     /** */
-    private Map<UUID, GridDhtPartitionExchangeId> extraNodes;
+    private Map<UUID, GridDhtPartitionExchangeId> joinedNodes;
 
     /** */
-    // TODO IGNITE-5578 backward compatibility
-    private boolean restoreState = true;
-
-    public boolean restoreState() {
-        return restoreState;
-    }
+    private boolean restoreState;
 
     /**
      * @param cctx Context.
      */
-    public InitNewCoordinatorFuture(GridCacheSharedContext cctx) {
+    InitNewCoordinatorFuture(GridCacheSharedContext cctx) {
         this.log = cctx.logger(getClass());
     }
 
@@ -91,6 +89,8 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
 
         GridCacheSharedContext cctx = exchFut.sharedContext();
 
+        restoreState = exchangeProtocolVersion(exchFut.context().events().discoveryCache().minimumNodeVersion())
> 1;
+
         boolean newAff = exchFut.localJoinExchange();
 
         IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(exchFut,
newAff);
@@ -98,68 +98,76 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
         if (fut != null)
             add(fut);
 
-        DiscoCache curDiscoCache = cctx.discovery().discoCache();
+        if (restoreState) {
+            DiscoCache curDiscoCache = cctx.discovery().discoCache();
 
-        DiscoCache discoCache = exchFut.discoCache();
+            DiscoCache discoCache = exchFut.discoCache();
 
-        List<ClusterNode> nodes = new ArrayList<>();
+            List<ClusterNode> nodes = new ArrayList<>();
 
-        synchronized (this) {
-            for (ClusterNode node : discoCache.allNodes()) {
-                if (!node.isLocal() && cctx.discovery().alive(node)) {
-                    awaited.add(node.id());
+            synchronized (this) {
+                for (ClusterNode node : discoCache.allNodes()) {
+                    if (!node.isLocal() && cctx.discovery().alive(node)) {
+                        awaited.add(node.id());
 
-                    nodes.add(node);
+                        nodes.add(node);
+                    }
                 }
-            }
 
-            if (exchFut.context().mergeExchanges() && !curDiscoCache.version().equals(discoCache.version()))
{
-                for (ClusterNode node : curDiscoCache.allNodes()) {
-                    if (discoCache.node(node.id()) == null) {
-                        awaited.add(node.id());
+                if (exchFut.context().mergeExchanges() && !curDiscoCache.version().equals(discoCache.version()))
{
+                    for (ClusterNode node : curDiscoCache.allNodes()) {
+                        if (discoCache.node(node.id()) == null) {
+                            if (exchangeProtocolVersion(node.version()) == 1)
+                                break;
 
-                        nodes.add(node);
+                            awaited.add(node.id());
+
+                            nodes.add(node);
 
-                        if (extraNodes == null)
-                            extraNodes = new HashMap<>();
+                            if (joinedNodes == null)
+                                joinedNodes = new HashMap<>();
 
-                        GridDhtPartitionExchangeId exchId = new GridDhtPartitionExchangeId(node.id(),
-                            EVT_NODE_JOINED,
-                            new AffinityTopologyVersion(node.order()));
+                            GridDhtPartitionExchangeId exchId = new GridDhtPartitionExchangeId(node.id(),
+                                EVT_NODE_JOINED,
+                                new AffinityTopologyVersion(node.order()));
 
-                        extraNodes.put(node.id(), exchId);
+                            joinedNodes.put(node.id(), exchId);
+                        }
                     }
+
+                    if (joinedNodes == null)
+                        joinedNodes = Collections.emptyMap();
                 }
-            }
 
-            if (!awaited.isEmpty()) {
-                restoreStateFut = new GridFutureAdapter();
+                if (!awaited.isEmpty()) {
+                    restoreStateFut = new GridFutureAdapter();
 
-                add(restoreStateFut);
+                    add(restoreStateFut);
+                }
             }
-        }
 
-        if (!nodes.isEmpty()) {
-            GridDhtPartitionsSingleRequest req = GridDhtPartitionsSingleRequest.restoreStateRequest(exchFut.exchangeId(),
-                exchFut.exchangeId());
+            if (!nodes.isEmpty()) {
+                GridDhtPartitionsSingleRequest req = GridDhtPartitionsSingleRequest.restoreStateRequest(exchFut.exchangeId(),
+                    exchFut.exchangeId());
 
-            for (ClusterNode node : nodes) {
-                try {
-                    GridDhtPartitionsSingleRequest sndReq = req;
+                for (ClusterNode node : nodes) {
+                    try {
+                        GridDhtPartitionsSingleRequest sndReq = req;
 
-                    if (extraNodes != null && extraNodes.containsKey(node.id()))
{
-                        sndReq = GridDhtPartitionsSingleRequest.restoreStateRequest(
-                            extraNodes.get(node.id()),
-                            exchFut.exchangeId());
-                    }
+                        if (joinedNodes.containsKey(node.id())) {
+                            sndReq = GridDhtPartitionsSingleRequest.restoreStateRequest(
+                                joinedNodes.get(node.id()),
+                                exchFut.exchangeId());
+                        }
 
-                    cctx.io().send(node, sndReq, GridIoPolicy.SYSTEM_POOL);
-                }
-                catch (ClusterTopologyCheckedException e) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send partitions request, node failed: " + node);
+                        cctx.io().send(node, sndReq, GridIoPolicy.SYSTEM_POOL);
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send partitions request, node failed: "
+ node);
 
-                    onNodeLeft(node.id());
+                        onNodeLeft(node.id());
+                    }
                 }
             }
         }
@@ -167,6 +175,10 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
         markInitialized();
     }
 
+    boolean restoreState() {
+        return restoreState;
+    }
+
     /**
      * @return Received messages.
      */
@@ -174,10 +186,6 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
         return msgs;
     }
 
-    Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchangeMessages() {
-        return mergedJoinExchMsgs;
-    }
-
     /**
      * @return Full message is some of nodes received it from previous coordinator.
      */
@@ -220,34 +228,51 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
     }
 
     private void onAllReceived() {
-        AffinityTopologyVersion resVer = fullMsg != null ? fullMsg.resultTopologyVersion()
: initTopVer;
+        if (fullMsg != null) {
+            AffinityTopologyVersion resVer = fullMsg.resultTopologyVersion();
 
-        for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>>
it = msgs.entrySet().iterator(); it.hasNext();) {
-            Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
+            for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>>
it = msgs.entrySet().iterator(); it.hasNext();) {
+                Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
 
-            GridDhtPartitionsSingleMessage msg = e.getValue();
+                GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
 
-            GridDhtPartitionExchangeId msgVer = extraNodes != null ? extraNodes.get(e.getKey().id())
: null;
-
-            if (msgVer != null) {
-                if (msgVer.topologyVersion().compareTo(resVer) < 0) {
-                    it.remove();
+                if (msgVer != null) {
+                    assert msgVer.topologyVersion().compareTo(initTopVer) > 0 : msgVer;
 
-                    continue;
+                    if (msgVer.topologyVersion().compareTo(resVer) > 0)
+                        it.remove();
+                    else
+                        e.getValue().exchangeId(msgVer);
                 }
+            }
+        }
+        else {
+            for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>>
it = msgs.entrySet().iterator(); it.hasNext();) {
+                Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
+
+                GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
+
+                if (msgVer != null) {
+                    it.remove();
 
-                assert msgVer.topologyVersion().compareTo(initTopVer) > 0 : msgVer;
+                    assert msgVer.topologyVersion().compareTo(initTopVer) > 0 : msgVer;
 
-                if (mergedJoinExchMsgs == null)
-                    mergedJoinExchMsgs = new HashMap<>();
+                    if (joinExchMsgs == null)
+                        joinExchMsgs = new HashMap<>();
 
-                msg.exchangeId(msgVer);
+                    e.getValue().exchangeId(msgVer);
 
-                mergedJoinExchMsgs.put(e.getKey().id(), msg);
+                    joinExchMsgs.put(e.getKey().id(), e.getValue());
+                }
             }
+
         }
     }
 
+    @Nullable GridDhtPartitionsSingleMessage joinExchangeMessage(UUID nodeId) {
+        return joinExchMsgs != null ? joinExchMsgs.get(nodeId) : null;
+    }
+
     /**
      * @param nodeId Failed node ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f788be4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 9c098ee..0de0f9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -83,6 +84,9 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     /** */
     private boolean cfgCache = true;
 
+    /** */
+    private IgniteClosure<String, Boolean> clientC;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -94,6 +98,9 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         Boolean clientMode = client.get();
 
+        if (clientMode == null && clientC != null)
+            clientMode = clientC.apply(igniteInstanceName);
+
         if (clientMode != null) {
             cfg.setClientMode(clientMode);
 
@@ -240,31 +247,47 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testStartCacheOnJoinAndJoinMerge1() throws Exception {
-        startCacheOnJoinAndJoinMerge1(2);
+    public void testStartCacheOnJoinAndJoinMerge_2_nodes() throws Exception {
+        startCacheOnJoinAndJoinMerge1(2, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testStartCacheOnJoinAndJoinMerge1_1() throws Exception {
-        startCacheOnJoinAndJoinMerge1(4);
+    public void testStartCacheOnJoinAndJoinMerge_4_nodes() throws Exception {
+        startCacheOnJoinAndJoinMerge1(4, false);
     }
 
     /**
-     * @param srvs Number of servers to start.
      * @throws Exception If failed.
      */
-    private void startCacheOnJoinAndJoinMerge1(int srvs) throws Exception {
+    public void testStartCacheOnJoinAndJoinMerge_WithClients() throws Exception {
+        startCacheOnJoinAndJoinMerge1(5, true);
+    }
+
+    /**
+     * @param nodes Number of nodes to start.
+     * @param withClients If {@code true} starts both servers and clients.
+     * @throws Exception If failed.
+     */
+    private void startCacheOnJoinAndJoinMerge1(int nodes, boolean withClients) throws Exception
{
         cfgCache = false;
 
         final IgniteEx srv0 = startGrid(0);
 
-        mergeExchangeWaitVersion(srv0, srvs + 1);
+        mergeExchangeWaitVersion(srv0, nodes + 1);
+
+        if (withClients) {
+            clientC = new IgniteClosure<String, Boolean>() {
+                @Override public Boolean apply(String nodeName) {
+                    return getTestIgniteInstanceIndex(nodeName) % 2 == 0;
+                }
+            };
+        }
 
         cfgCache = true;
 
-        IgniteInternalFuture fut = startGrids(srv0, 1, srvs);
+        IgniteInternalFuture fut = startGrids(srv0, 1, nodes);
 
         fut.get();
 
@@ -431,7 +454,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     private void mergeJoinExchangesCoordinatorChange1(final int srvs, CoordinatorChangeMode
mode)
         throws Exception
     {
-        log.info("mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + ", mode=" + mode
+ ']');
+        log.info("Test mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + ", mode=" +
mode + ']');
 
         testSpi = true;
 


Mime
View raw message