ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4154
Date Tue, 01 Nov 2016 11:10:08 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4154 2825efcdf -> 8cce0a32e


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8cce0a32
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8cce0a32
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8cce0a32

Branch: refs/heads/ignite-4154
Commit: 8cce0a32e744357ce1dc2304d2abab87baf20fe0
Parents: 2825efc
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Nov 1 14:02:49 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Nov 1 14:02:49 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 76 +++++++++++++++++++-
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  7 ++
 2 files changed, 82 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8cce0a32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3acab40..d7b4d09 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1872,7 +1872,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             assert spi.ensured(msg) && msg.verified() : msg;
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage) {
-                TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+                TcpDiscoveryNodeAddedMessage addedMsg =
+                    new TcpDiscoveryNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+
+                msg = addedMsg;
 
                 TcpDiscoveryNode node = addedMsg.node();
 
@@ -1890,12 +1893,83 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     addedMsg.clientTopology(top);
                 }
+
+                // Do not need this data for client reconnect.
+                addedMsg.oldNodesDiscoveryData(null);
+            }
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
+
+                if (addFinishMsg.clientDiscoData() != null) {
+                    Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
+
+                    Set<UUID> replaced = null;
+
+                    for (TcpDiscoveryAbstractMessage msg0 : msgs) {
+                        if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                            Map<UUID, Map<Integer, byte[]>> existingDiscoData
=
+                                ((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData();
+
+                            // Check if already stored message contains the same data to
do not store copies multiple times.
+                            if (existingDiscoData != null) {
+                                for (Map.Entry<UUID, Map<Integer, byte[]>> e
: discoData.entrySet()) {
+                                    UUID nodeId = e.getKey();
+
+                                    if (F.contains(replaced, nodeId))
+                                        continue;
+
+                                    Map<Integer, byte[]> existingData = existingDiscoData.get(e.getKey());
+
+                                    if (existingData != null && mapsEqual(e.getValue(),
existingData)) {
+                                        e.setValue(existingData);
+
+                                        if (replaced == null)
+                                            replaced = new HashSet<>();
+
+                                        boolean add = replaced.add(nodeId);
+
+                                        assert add;
+
+                                        if (replaced.size() == discoData.size())
+                                            break;
+                                    }
+                                }
+
+                                if (replaced != null && replaced.size() == discoData.size())
+                                    break;
+                            }
+                        }
+                    }
+                }
             }
 
             msgs.add(msg);
         }
 
         /**
+         * @param m1 Map 1.
+         * @param m2 Map 2.
+         * @return {@code True} if maps contain the same data.
+         */
+        private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]>
m2) {
+            if (m1 == m2)
+                return true;
+
+            if (m1.size() == m2.size()) {
+                for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
+                    byte[] data = m2.get(e.getKey());
+
+                    if (!Arrays.equals(e.getValue(), data))
+                        return false;
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
          * Gets messages starting from provided ID (exclusive). If such
          * message is not found, {@code null} is returned (this indicates
          * a failure condition when it was already removed from queue).

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cce0a32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 7f7134f..bd52c04 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -236,6 +236,13 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage
{
     }
 
     /**
+     * @param oldNodesDiscoData Discovery data from old nodes.
+     */
+    public void oldNodesDiscoveryData(Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData)
{
+        this.oldNodesDiscoData = oldNodesDiscoData;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param discoData Discovery data to add.
      */


Mime
View raw message