ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 5578
Date Mon, 24 Jul 2017 16:24:55 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 b681ca663 -> ccb855ea0


5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ccb855ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ccb855ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ccb855ea

Branch: refs/heads/ignite-5578
Commit: ccb855ea099288c9e48beb980e5f51c03a5a5046
Parents: b681ca6
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jul 24 19:19:58 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jul 24 19:19:58 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java |   2 +-
 .../cache/CacheAffinitySharedManager.java       |   2 +-
 .../GridCachePartitionExchangeManager.java      |  14 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   2 +-
 .../preloader/GridDhtPartitionExchangeId.java   |  11 ++
 .../GridDhtPartitionsExchangeFuture.java        | 101 ++++++++++---
 .../preloader/GridDhtPartitionsFullMessage.java |   8 +-
 .../GridDhtPartitionsSingleRequest.java         |  48 +++++-
 .../dht/preloader/InitNewCoordinatorFuture.java |  95 +++++++++++-
 .../distributed/CacheExchangeMergeTest.java     | 149 ++++++++++++++++++-
 10 files changed, 393 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 1d8cfdf..5ac99f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -292,7 +292,7 @@ public class DiscoCache {
      * @param id Node ID.
      * @return Node.
      */
-    public @Nullable ClusterNode node(UUID id) {
+    @Nullable public ClusterNode node(UUID id) {
         return nodeMap.get(id);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0db5d6b..37bfa8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1339,7 +1339,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         final ExchangeDiscoveryEvents evts = fut.context().events();
 
-        log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version()
+ ']');
+        log.info("mergeExchangesOnServerJoin [topVer=" + evts.discoveryCache().version()
+ ']');
 
         assert evts.serverJoin() && !evts.serverLeft();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 02cd758..b2cf940 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -302,6 +302,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         GridDhtPartitionExchangeId exchId = msg.exchangeId();
 
                         log.info("Waiting for coordinator initialization [node=" + node.id()
+
+                            ", nodeOrder=" + node.order() +
                             ", ver=" + (exchId != null ? exchId.topologyVersion() : null)
+ ']');
 
                         crdInitFut.listen(new CI1<IgniteInternalFuture>() {
@@ -1763,8 +1764,10 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
         this.exchMergeTestWaitVer = exchMergeTestWaitVer;
     }
 
-    public void mergeExchanges(GridDhtPartitionsExchangeFuture curFut, AffinityTopologyVersion
resVer)
+    public void mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, GridDhtPartitionsFullMessage
msg)
         throws IgniteInterruptedCheckedException {
+        AffinityTopologyVersion resVer = msg.resultTopologyVersion();
+
         exchWorker.waitForExchangeFuture(resVer);
 
         for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
@@ -1782,10 +1785,19 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                 log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion()
+
                     ", mergedFut=" + fut.initialVersion() + ']');
 
+                DiscoveryEvent evt = fut.discoveryEvent();
+
                 curFut.context().events().addEvent(fut.initialVersion(),
                     fut.discoveryEvent(),
                     fut.discoCache());
 
+                if (evt.type() == EVT_NODE_JOINED) {
+                    final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut);
+
+                    if (pendingMsg != null)
+                        curFut.waitAndReplayToNode(evt.eventNode(), pendingMsg);
+                }
+
                 exchWorker.futQ.remove(fut);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a3a488a..9da0594 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1118,7 +1118,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             if (stopping)
                 return false;
 
-            if (exchangeVer == null && !topReadyFut.isDone())
+            if (exchangeVer == null && (topReadyFut == null || !topReadyFut.isDone()))
                 return false;
 
             if (exchangeVer != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index 0a49415..07daeda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -64,6 +64,17 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
 
     /**
      * @param nodeId Node ID.
+     * @param evt Event type.
+     * @param topVer Topology version.
+     */
+    public GridDhtPartitionExchangeId(UUID nodeId, int evt, AffinityTopologyVersion topVer)
{
+        this.nodeId = nodeId;
+        this.evt = evt;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @param nodeId Node ID.
      * @param discoEvt Event.
      * @param topVer Topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8f9b5d3..34ebe41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1299,6 +1299,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private void sendAllPartitions(
         GridDhtPartitionsFullMessage msg,
         Collection<ClusterNode> nodes,
+        Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs,
         Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
         boolean singleNode = nodes.size() == 1;
 
@@ -1592,7 +1593,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param fut Current exchange to merge with.
      * @return {@code True} if need wait for message from joined server node.
      */
-    public boolean mergeJoinExchange(final GridDhtPartitionsExchangeFuture fut) {
+    public boolean mergeJoinExchange(GridDhtPartitionsExchangeFuture fut) {
         boolean wait;
 
         synchronized (this) {
@@ -1613,6 +1614,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         return wait;
     }
 
+    @Nullable public GridDhtPartitionsSingleMessage mergeJoinExchangeOnDone(GridDhtPartitionsExchangeFuture
fut) {
+        synchronized (this) {
+            assert !isDone();
+            assert !initFut.isDone();
+            assert mergedWith == null;
+            assert state == null;
+
+            state = ExchangeLocalState.MERGED;
+
+            mergedWith = fut;
+
+            return pendingJoinMsg;
+        }
+    }
+
     /**
      * @param node
      * @param msg
@@ -1626,28 +1642,43 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         boolean done = false;
 
+        FinishState finishState0 = null;
+
         synchronized (this) {
-            boolean process = mergedJoinExchMsgs != null &&
-                mergedJoinExchMsgs.containsKey(node.id()) &&
-                mergedJoinExchMsgs.get(node.id()) == null;
+            if (state == ExchangeLocalState.DONE) {
+                assert finishState != null;
 
-            log.info("Merge server join exchange, received message [curFut=" + initialVersion()
+
-                ", node=" + node.id() +
-                ", msgVer=" + msg.exchangeId().topologyVersion() +
-                ", process=" + process +
-                ", awaited=" + awaitMergedMsgs + ']');
+                finishState0 = finishState;
+            }
+            else {
+                boolean process = mergedJoinExchMsgs != null &&
+                    mergedJoinExchMsgs.containsKey(node.id()) &&
+                    mergedJoinExchMsgs.get(node.id()) == null;
 
-            if (process) {
-                mergedJoinExchMsgs.put(node.id(), msg);
+                log.info("Merge server join exchange, received message [curFut=" + initialVersion()
+
+                    ", node=" + node.id() +
+                    ", msgVer=" + msg.exchangeId().topologyVersion() +
+                    ", process=" + process +
+                    ", awaited=" + awaitMergedMsgs + ']');
 
-                assert awaitMergedMsgs > 0 : awaitMergedMsgs;
+                if (process) {
+                    mergedJoinExchMsgs.put(node.id(), msg);
 
-                awaitMergedMsgs--;
+                    assert awaitMergedMsgs > 0 : awaitMergedMsgs;
 
-                done = awaitMergedMsgs == 0;
+                    awaitMergedMsgs--;
+
+                    done = awaitMergedMsgs == 0;
+                }
             }
         }
 
+        if (finishState0 != null) {
+            sendAllPartitionsToNode(finishState0, msg, node.id());
+
+            return;
+        }
+
         if (done)
             finishExchangeOnCoordinator();
     }
@@ -1676,7 +1707,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             return;
         }
 
-
         if (!msg.client()) {
             assert msg.lastVersion() != null : msg;
 
@@ -1722,6 +1752,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         });
     }
 
+    public void waitAndReplayToNode(final ClusterNode node, final GridDhtPartitionsSingleMessage
msg) {
+        listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
+                FinishState finishState0;
+
+                synchronized (GridDhtPartitionsExchangeFuture.this) {
+                    finishState0 = finishState;
+                }
+
+                assert finishState0 != null;
+
+                sendAllPartitionsToNode(finishState0, msg, node.id());
+            }
+        });
+    }
+
     /**
      * @param nodeId Node ID.
      * @param msg Client's message.
@@ -1754,7 +1800,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param nodeId Sender node.
      * @param msg Message.
      */
-    private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+    void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
         if (msg.client()) {
             waitAndReplyToClient(nodeId, msg);
 
@@ -2271,11 +2317,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             else {
                 List<ClusterNode> nodes;
 
+                Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs0;
+
                 synchronized (this) {
                     srvNodes.remove(cctx.localNode());
 
                     nodes = new ArrayList<>(srvNodes);
 
+                    mergedJoinExchMsgs0 = mergedJoinExchMsgs;
+
                     if (mergedJoinExchMsgs != null) {
                         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet())
{
                             if (e.getValue() != null) {
@@ -2315,7 +2365,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 if (!nodes.isEmpty())
-                    sendAllPartitions(msg, nodes, joinedNodeAff);
+                    sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff);
 
                 onDone(exchCtx.events().topologyVersion(), err);
 
@@ -2497,11 +2547,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         if (msg.restoreState()) {
             try {
-                GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage(msg.exchangeId(),
+                assert msg.restoreExchangeId() != null : msg;
+
+                GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage(
+                    msg.restoreExchangeId(),
                     cctx.kernalContext().clientNode(),
                     true);
 
-                if (localJoinExchange())
+                if (localJoinExchange() && finishState0 == null)
                     res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
 
                 res.restoreState(true);
@@ -2599,7 +2652,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     log.info("Received full message, need merge [curFut=" + initialVersion()
+
                         ", resVer=" + msg.resultTopologyVersion() + ']');
 
-                    cctx.exchange().mergeExchanges(this, msg.resultTopologyVersion());
+                    cctx.exchange().mergeExchanges(this, msg);
                 }
 
                 if (localJoinExchange())
@@ -2972,11 +3025,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             boolean process = fullMsg == null;
 
-            Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs0 = newCrdFut.messages();
-
             assert msgs.isEmpty() : msgs;
 
-            for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : msgs0.entrySet())
{
+            for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : newCrdFut.messages().entrySet())
{
                 GridDhtPartitionsSingleMessage msg = e.getValue();
 
                 if (!msg.client()) {
@@ -2997,6 +3048,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     finishState = new FinishState(crd.id(), fullMsg.resultTopologyVersion(),
fullMsg);
                 }
 
+                fullMsg.exchangeId(exchId);
+
                 processFullMessage(false, null, fullMsg);
 
                 Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = newCrdFut.messages();
@@ -3017,7 +3070,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         }
                     }
 
-                    sendAllPartitions(fullMsg, msgs.keySet(), joinedNodeAff);
+                    sendAllPartitions(fullMsg, msgs.keySet(), newCrdFut.mergedJoinExchangeMessages(),
joinedNodeAff);
                 }
 
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 66fe1b5..0157a84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -160,11 +160,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         cp.resTopVer = resTopVer;
     }
 
+    /**
+     * @param resTopVer Result topology version.
+     */
     public void resultTopologyVersion(AffinityTopologyVersion resTopVer) {
         this.resTopVer = resTopVer;
     }
 
-    AffinityTopologyVersion resultTopologyVersion() {
+    /**
+     * @return Result topology version.
+     */
+    public AffinityTopologyVersion resultTopologyVersion() {
         return resTopVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 4b80ee0..4f1cdc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -33,6 +33,9 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private GridDhtPartitionExchangeId restoreExchId;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -47,6 +50,29 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
         super(id, null);
     }
 
+    /**
+     * @param msgExchId Exchange ID for message.
+     * @param restoreExchId Initial exchange ID for current exchange.
+     * @return Message.
+     */
+    public static GridDhtPartitionsSingleRequest restoreStateRequest(GridDhtPartitionExchangeId
msgExchId, GridDhtPartitionExchangeId restoreExchId) {
+        GridDhtPartitionsSingleRequest msg = new GridDhtPartitionsSingleRequest(msgExchId);
+
+        msg.restoreState(true);
+
+        msg.restoreExchangeId(restoreExchId);
+
+        return msg;
+    }
+
+    public GridDhtPartitionExchangeId restoreExchangeId() {
+        return restoreExchId;
+    }
+
+    public void restoreExchangeId(GridDhtPartitionExchangeId restoreExchId) {
+        this.restoreExchId = restoreExchId;
+    }
+
     /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;
@@ -71,6 +97,15 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
             writer.onHeaderWritten();
         }
 
+        switch (writer.state()) {
+            case 5:
+                if (!writer.writeMessage("restoreExchId", restoreExchId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
         return true;
     }
 
@@ -84,6 +119,17 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
         if (!super.readFrom(buf, reader))
             return false;
 
+        switch (reader.state()) {
+            case 5:
+                restoreExchId = reader.readMessage("restoreExchId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
         return reader.afterMessageRead(GridDhtPartitionsSingleRequest.class);
     }
 
@@ -94,7 +140,7 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 6;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 4f9eee3..916f688 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,10 +32,13 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+
 /**
  *
  */
@@ -49,12 +53,21 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
     private Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = new HashMap<>();
 
     /** */
+    private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
+
+    /** */
     private GridFutureAdapter restoreStateFut;
 
     /** */
     private IgniteLogger log;
 
     /** */
+    private AffinityTopologyVersion initTopVer;
+
+    /** */
+    private Map<UUID, GridDhtPartitionExchangeId> extraNodes;
+
+    /** */
     // TODO IGNITE-5578 backward compatibility
     private boolean restoreState = true;
 
@@ -67,6 +80,8 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
      * @throws IgniteCheckedException If failed.
      */
     public void init(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException
{
+        initTopVer = exchFut.initialVersion();
+
         GridCacheSharedContext cctx = exchFut.sharedContext();
 
         log = cctx.logger(getClass());
@@ -78,6 +93,8 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
         if (fut != null)
             add(fut);
 
+        DiscoCache curDiscoCache = cctx.discovery().discoCache();
+
         DiscoCache discoCache = exchFut.discoCache();
 
         List<ClusterNode> nodes = new ArrayList<>();
@@ -91,6 +108,25 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
                 }
             }
 
+            if (!curDiscoCache.version().equals(discoCache.version())) {
+                for (ClusterNode node : curDiscoCache.allNodes()) {
+                    if (discoCache.node(node.id()) == null) {
+                        awaited.add(node.id());
+
+                        nodes.add(node);
+
+                        if (extraNodes == null)
+                            extraNodes = new HashMap<>();
+
+                        GridDhtPartitionExchangeId exchId = new GridDhtPartitionExchangeId(node.id(),
+                            EVT_NODE_JOINED,
+                            new AffinityTopologyVersion(node.order()));
+
+                        extraNodes.put(node.id(), exchId);
+                    }
+                }
+            }
+
             if (!awaited.isEmpty()) {
                 restoreStateFut = new GridFutureAdapter();
 
@@ -100,13 +136,20 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
 
         if (!nodes.isEmpty()) {
             // TODO IGNITE-5578: merged nodes.
-            GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchFut.exchangeId());
-
-            req.restoreState(true);
+            GridDhtPartitionsSingleRequest req = GridDhtPartitionsSingleRequest.restoreStateRequest(exchFut.exchangeId(),
+                exchFut.exchangeId());
 
             for (ClusterNode node : nodes) {
                 try {
-                    cctx.io().send(node, req, GridIoPolicy.SYSTEM_POOL);
+                    GridDhtPartitionsSingleRequest sndReq = req;
+
+                    if (extraNodes != null && extraNodes.containsKey(node.id()))
{
+                        sndReq = GridDhtPartitionsSingleRequest.restoreStateRequest(
+                            extraNodes.get(node.id()),
+                            exchFut.exchangeId());
+                    }
+
+                    cctx.io().send(node, sndReq, GridIoPolicy.SYSTEM_POOL);
                 }
                 catch (ClusterTopologyCheckedException e) {
                     if (log.isDebugEnabled())
@@ -127,6 +170,10 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
         return msgs;
     }
 
+    Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchangeMessages() {
+        return mergedJoinExchMsgs;
+    }
+
     /**
      * @return Full message is some of nodes received it from previous coordinator.
      */
@@ -139,6 +186,8 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
      * @param msg Message.
      */
     public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+        log.info("Init new coordinator, received response [node=" + node.id() + ']');
+
         assert msg.restoreState() : msg;
 
         boolean done = false;
@@ -152,21 +201,55 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
 
                     fullMsg  = fullMsg0;
                 }
-
-                msgs.put(node, msg);
+                else
+                    msgs.put(node, msg);
 
                 done = awaited.isEmpty();
             }
+
+            if (done)
+                onAllReceived();
         }
 
         if (done)
             restoreStateFut.onDone();
     }
 
+    private void onAllReceived() {
+        AffinityTopologyVersion resVer = fullMsg != null ? fullMsg.resultTopologyVersion()
: initTopVer;
+
+        for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>>
it = msgs.entrySet().iterator(); it.hasNext();) {
+            Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
+
+            GridDhtPartitionsSingleMessage msg = e.getValue();
+
+            GridDhtPartitionExchangeId msgVer = extraNodes != null ? extraNodes.get(e.getKey().id())
: null;
+
+            if (msgVer != null) {
+                if (msgVer.topologyVersion().compareTo(resVer) < 0) {
+                    it.remove();
+
+                    continue;
+                }
+
+                assert msgVer.topologyVersion().compareTo(initTopVer) > 0 : msgVer;
+
+                if (mergedJoinExchMsgs == null)
+                    mergedJoinExchMsgs = new HashMap<>();
+
+                msg.exchangeId(msgVer);
+
+                mergedJoinExchMsgs.put(e.getKey().id(), msg);
+            }
+        }
+    }
+
     /**
      * @param nodeId Failed node ID.
      */
     public void onNodeLeft(UUID nodeId) {
+        log.info("Init new coordinator, node left [node=" + nodeId + ']');
+
         boolean done;
 
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccb855ea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 8b51e47..7359bde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -32,6 +32,9 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
@@ -40,12 +43,13 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -342,7 +346,100 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testMergeExchangeCoordinatorChange() throws Exception {
+    public void testMergeJoinExchangesCoordinatorChange1_4_servers() throws Exception {
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            mergeJoinExchangesCoordinatorChange1(4, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeJoinExchangesCoordinatorChange1_8_servers() throws Exception {
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            mergeJoinExchangesCoordinatorChange1(8, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param mode Test mode.
+     * @throws Exception If failed.
+     */
+    private void mergeJoinExchangesCoordinatorChange1(final int srvs, CoordinatorChangeMode
mode)
+        throws Exception
+    {
+        testSpi = true;
+
+        Ignite srv0 = startGrids(srvs);
+
+        mergeExchangeWaitVersion(srv0, 6);
+
+        CountDownLatch latch = blockExchangeFinish(srvs, mode);
+
+        IgniteInternalFuture<?> fut = startGrids(srv0, srvs, 2);
+
+        if (latch != null && !latch.await(5, TimeUnit.SECONDS))
+            fail("Failed to wait for expected messages.");
+
+        stopGrid(getTestIgniteInstanceName(0), true, false);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeJoinExchangesCoordinatorChange2_4_servers() throws Exception {
+        mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 4), F.asList(5));
+
+        stopAllGrids();
+
+        mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 5), F.asList(4));
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param startNodes Number of nodes to start.
+     * @param blockNodes Nodes which do not receive messages.
+     * @param waitMsgNodes Nodes which should receive messages.
+     * @throws Exception If failed.
+     */
+    private void mergeJoinExchangeCoordinatorChange2(final int srvs,
+        final int startNodes,
+        List<Integer> blockNodes,
+        List<Integer> waitMsgNodes) throws Exception
+    {
+        testSpi = true;
+
+        Ignite srv0 = startGrids(srvs);
+
+        mergeExchangeWaitVersion(srv0, srvs + startNodes);
+
+        CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, waitMsgNodes);
+
+        IgniteInternalFuture<?> fut = startGrids(srv0, srvs, startNodes);
+
+        if (latch != null && !latch.await(5, TimeUnit.SECONDS))
+            fail("Failed to wait for expected messages.");
+
+        stopGrid(getTestIgniteInstanceName(0), true, false);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeExchangeCoordinatorChange4() throws Exception {
         testSpi = true;
 
         final int srvs = 4;
@@ -353,7 +450,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         final AtomicInteger idx = new AtomicInteger(srvs);
 
-        CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(2, 3, 4, 5), F.asList(1));
+        CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(1, 2, 3, 4), F.asList(5));
 
         IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
             @Override public Void call() throws Exception {
@@ -663,6 +760,52 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param node Some existing node.
+     * @param startIdx Start node index.
+     * @param cnt Number of nodes.
+     * @return Start future.
+     * @throws Exception If failed.
+     */
+    private IgniteInternalFuture startGrids(Ignite node, int startIdx, int cnt) throws Exception
{
+        GridCompoundFuture fut = new GridCompoundFuture();
+
+        for (int i = 0; i < cnt; i++) {
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            node.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    log.info("Got event: " + ((DiscoveryEvent)evt).eventNode().id());
+
+                    latch.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_NODE_JOINED);
+
+            final int nodeIdx = startIdx + i;
+
+            IgniteInternalFuture fut0 = GridTestUtils.runAsync(new Callable() {
+                @Override public Object call() throws Exception {
+                    log.info("Start new node: " + nodeIdx);
+
+                    startGrid(nodeIdx);
+
+                    return null;
+                }
+            }, "start-node-" + nodeIdx);
+
+            if (!latch.await(5, TimeUnit.SECONDS))
+                fail();
+
+            fut.add(fut0);
+        }
+
+        fut.markInitialized();
+
+        return fut;
+    }
+
+    /**
      *
      */
     enum CoordinatorChangeMode {


Mime
View raw message