ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [7/7] ignite git commit: ignite-comm-opts1
Date Fri, 09 Sep 2016 14:01:55 GMT
ignite-comm-opts1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/38965d3d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/38965d3d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/38965d3d

Branch: refs/heads/ignite-comm-opts1
Commit: 38965d3d3e83831bf526f34a6cc0a6fe4289e524
Parents: c81e0d9
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Sep 9 17:01:25 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Sep 9 17:01:25 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  40 +--
 .../util/nio/GridNioRecoveryDescriptor.java     |   1 +
 .../ignite/internal/util/nio/GridNioServer.java |  14 +-
 .../internal/util/nio/GridNioSession.java       |  14 +-
 .../internal/util/nio/GridNioSessionImpl.java   |  14 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  44 ++-
 .../communication/tcp/TcpCommunicationSpi.java  | 358 +++++++++++++------
 .../nio/impl/GridNioFilterChainSelfTest.java    |  14 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |   2 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   2 +-
 ...CommunicationRecoveryAckClosureSelfTest.java |   2 +-
 11 files changed, 340 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9e547ca..f869c5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -529,26 +529,26 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         }
 
-        Thread t = new Thread(
-            new Runnable() {
-                @Override public void run() {
-                    for (;;) {
-                        try {
-                            Thread.sleep(5000);
-                        }
-                        catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-
-                        dumpStats();
-                    }
-                }
-            }
-        );
-
-        t.setDaemon(true);
-
-        t.start();
+//        Thread t = new Thread(
+//            new Runnable() {
+//                @Override public void run() {
+//                    for (;;) {
+//                        try {
+//                            Thread.sleep(5000);
+//                        }
+//                        catch (InterruptedException e) {
+//                            e.printStackTrace();
+//                        }
+//
+//                        dumpStats();
+//                    }
+//                }
+//            }
+//        );
+//
+//        t.setDaemon(true);
+//
+//        t.start();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 35480ac..bf8e26a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -31,6 +31,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Recovery information for single node.
  */
+@Deprecated // To be splitted into in/out parts when do need maintain backward compatibility.
 public class GridNioRecoveryDescriptor {
     /** Number of acknowledged messages. */
     private long acked;

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a2449f8..18caea6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -521,7 +521,7 @@ public class GridNioServer<T> {
     public void resend(GridNioSession ses) {
         assert ses instanceof GridSelectorNioSessionImpl;
 
-        GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+        GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
 
         if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) {
             Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
@@ -695,7 +695,7 @@ public class GridNioServer<T> {
         assert req.operation() == NioOperation.REGISTER;
         assert req.socketChannel() != null;
 
-        U.debug("Req registration: " + req);
+        //U.debug("Req registration: " + req);
 
         int balanceIdx = req.accepted() ? readBalanceIdx.getAndAdd(2) : writeBalanceIdx.getAndAdd(2);
 
@@ -1531,7 +1531,8 @@ public class GridNioServer<T> {
                                         .append("rmtAddr=").append(ses.remoteAddress())
                                         .append(", locAddr=").append(ses.localAddress());
 
-                                    GridNioRecoveryDescriptor desc = ses.recoveryDescriptor();
+                                    // TODO
+                                    GridNioRecoveryDescriptor desc = ses.outRecoveryDescriptor();
 
                                     if (desc != null) {
                                         sb.append(", msgsSent=").append(desc.sent())
@@ -1900,7 +1901,7 @@ public class GridNioServer<T> {
                 // Since ses is in closed state, no write requests will be added.
                 NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal());
 
-                GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+                GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
 
                 if (recovery != null) {
                     try {
@@ -1922,6 +1923,11 @@ public class GridNioServer<T> {
                         fut.connectionClosed();
                 }
 
+                GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor();
+
+                if (inRecovery != null && inRecovery != recovery)
+                    inRecovery.release();
+
                 try {
                     filterChain.onSessionClosed(ses);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index e4a7225..1e427d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -158,10 +158,20 @@ public interface GridNioSession {
     /**
      * @param recoveryDesc Recovery descriptor.
      */
-    public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+    public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+    /**
+     * @param recoveryDesc Recovery descriptor.
+     */
+    public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+    /**
+     * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
+     */
+    @Nullable public GridNioRecoveryDescriptor outRecoveryDescriptor();
 
     /**
      * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
      */
-    @Nullable public GridNioRecoveryDescriptor recoveryDescriptor();
+    @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 5d023ca..3f5d367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -346,12 +346,22 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
-    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+    @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+    @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index d989ed6..8fddd10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -59,8 +59,11 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Read buffer. */
     private ByteBuffer readBuf;
 
-    /** Recovery data. */
-    private GridNioRecoveryDescriptor recovery;
+    /** Incoming recovery data. */
+    private GridNioRecoveryDescriptor inRecovery;
+
+    /** Outgoing recovery data. */
+    private GridNioRecoveryDescriptor outRecovery;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -232,17 +235,17 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
             if (sem != null && !last.messageThread())
                 sem.release();
 
-            if (recovery != null) {
-                if (!recovery.add(last)) {
+            if (outRecovery != null) {
+                if (!outRecovery.add(last)) {
                     LT.warn(log, null, "Unacknowledged messages queue size overflow, will
attempt to reconnect " +
                         "[remoteAddr=" + remoteAddress() +
-                        ", queueLimit=" + recovery.queueLimit() + ']');
+                        ", queueLimit=" + outRecovery.queueLimit() + ']');
 
                     if (log.isDebugEnabled())
                         log.debug("Unacknowledged messages queue size overflow, will attempt
to reconnect " +
                             "[remoteAddr=" + remoteAddress() +
-                            ", queueSize=" + recovery.messagesFutures().size() +
-                            ", queueLimit=" + recovery.queueLimit() + ']');
+                            ", queueSize=" + outRecovery.messagesFutures().size() +
+                            ", queueLimit=" + outRecovery.queueLimit() + ']');
 
                     close();
                 }
@@ -279,24 +282,35 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /** {@inheritDoc} */
-    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+    @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        assert recoveryDesc != null;
+
+        outRecovery = recoveryDesc;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+        return outRecovery;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
         assert recoveryDesc != null;
 
-        recovery = recoveryDesc;
+        inRecovery = recoveryDesc;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
-        return recovery;
+    @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
+        return inRecovery;
     }
 
     /** {@inheritDoc} */
     @Override public <T> T addMeta(int key, @Nullable T val) {
-        if (val instanceof GridNioRecoveryDescriptor) {
-            recovery = (GridNioRecoveryDescriptor)val;
+        if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
+            outRecovery = (GridNioRecoveryDescriptor)val;
 
-            if (!accepted())
-                recovery.connected();
+            outRecovery.connected();
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 3292412..97c75c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -102,6 +102,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -236,6 +237,9 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 @IgniteSpiConsistencyChecked(optional = false)
 public class TcpCommunicationSpi extends IgniteSpiAdapter
     implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+    /** */
+    private static final IgniteProductVersion TWO_CONN_SINCE_VER = IgniteProductVersion.fromString("1.8.0");
+
     /** IPC error message. */
     public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory
segment " +
         "(switching to TCP, may be slower).";
@@ -365,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (!stopping) {
                         boolean reconnect = false;
 
-                        GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
+                        GridNioRecoveryDescriptor recoveryData = ses.outRecoveryDescriptor();
 
                         if (recoveryData != null) {
                             if (recoveryData.nodeAlive(getSpiContext().node(id))) {
@@ -427,105 +431,116 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     return;
                 }
 
-//                ClusterNode locNode = getSpiContext().localNode();
-//
-//                if (ses.remoteAddress() == null)
-//                    return;
-//
-//                GridCommunicationClient oldClient = clients.get(sndId);
-//
-                boolean hasShmemClient = false;
-//
-//                if (oldClient != null) {
-//                    if (oldClient instanceof GridTcpNioCommunicationClient) {
-//                        if (log.isDebugEnabled())
-//                            log.debug("Received incoming connection when already connected
" +
-//                                    "to this node, rejecting [locNode=" + locNode.id()
+
-//                                    ", rmtNode=" + sndId + ']');
-//
-//                        ses.send(new RecoveryLastReceivedMessage(-1));
-//
-//                        return;
-//                    }
-//                    else {
-//                        assert oldClient instanceof GridShmemCommunicationClient;
-//
-//                        hasShmemClient = true;
-//                    }
-//                }
-//
-                GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
-//
-//                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId,
fut);
-//
+                ClusterNode locNode = getSpiContext().localNode();
+
+                if (ses.remoteAddress() == null)
+                    return;
+
                 assert msg instanceof HandshakeMessage : msg;
 
                 HandshakeMessage msg0 = (HandshakeMessage)msg;
-//
-                final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
-//
-//                if (oldFut == null) {
-//                    oldClient = clients.get(sndId);
-//
-//                    if (oldClient != null) {
-//                        if (oldClient instanceof GridTcpNioCommunicationClient) {
-//                            if (log.isDebugEnabled())
-//                                log.debug("Received incoming connection when already connected
" +
-//                                        "to this node, rejecting [locNode=" + locNode.id()
+
-//                                        ", rmtNode=" + sndId + ']');
-//
-//                            ses.send(new RecoveryLastReceivedMessage(-1));
-//
-//                            fut.onDone(oldClient);
-//
-//                            return;
-//                        }
-//                        else {
-//                            assert oldClient instanceof GridShmemCommunicationClient;
-//
-//                            hasShmemClient = true;
-//                        }
-//                    }
-//
-//                    boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
-//                            new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient,
fut));
-//
-//                    if (log.isDebugEnabled())
-//                        log.debug("Received incoming connection from remote node " +
-//                                "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved +
']');
-//
-//                    if (reserved) {
-//                        try {
-//                            GridTcpNioCommunicationClient client =
-//                                    connected(recoveryDesc, ses, rmtNode, msg0.received(),
true, !hasShmemClient);
-//
-//                            fut.onDone(client);
-//                        }
-//                        finally {
-//                            clientFuts.remove(rmtNode.id(), fut);
-//                        }
-//                    }
-//                }
-//                else {
-//                    if (oldFut instanceof ConnectFuture && locNode.order() <
rmtNode.order()) {
-//                        if (log.isDebugEnabled()) {
-//                            log.debug("Received incoming connection from remote node while
" +
-//                                    "connecting to this node, rejecting [locNode=" + locNode.id()
+
-//                                    ", locNodeOrder=" + locNode.order() + ", rmtNode="
+ rmtNode.id() +
-//                                    ", rmtNodeOrder=" + rmtNode.order() + ']');
-//                        }
-//
-//                        ses.send(new RecoveryLastReceivedMessage(-1));
-//                    }
-//                    else {
-                        // The code below causes a race condition between shmem and TCP (see
IGNITE-1294)
+
+                if (twoConnections(rmtNode)) {
+                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode);
+
+                    boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(),
+                        new ConnectClosureNew(ses, recoveryDesc, rmtNode, msg0));
+
+                    if (reserve)
+                        connectedNew(recoveryDesc, ses, msg0.received(), true);
+                }
+                else {
+                    GridCommunicationClient oldClient = clients.get(sndId);
+
+                    boolean hasShmemClient = false;
+
+                    if (oldClient != null) {
+                        if (oldClient instanceof GridTcpNioCommunicationClient) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received incoming connection when already connected
" +
+                                    "to this node, rejecting [locNode=" + locNode.id() +
+                                    ", rmtNode=" + sndId + ']');
+
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+
+                            return;
+                        }
+                        else {
+                            assert oldClient instanceof GridShmemCommunicationClient;
+
+                            hasShmemClient = true;
+                        }
+                    }
+
+                    GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
+
+                    GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId,
fut);
+
+                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode);
+
+                    if (oldFut == null) {
+                        oldClient = clients.get(sndId);
+
+                        if (oldClient != null) {
+                            if (oldClient instanceof GridTcpNioCommunicationClient) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Received incoming connection when already
connected " +
+                                        "to this node, rejecting [locNode=" + locNode.id()
+
+                                        ", rmtNode=" + sndId + ']');
+
+                                ses.send(new RecoveryLastReceivedMessage(-1));
+
+                                fut.onDone(oldClient);
+
+                                return;
+                            }
+                            else {
+                                assert oldClient instanceof GridShmemCommunicationClient;
+
+                                hasShmemClient = true;
+                            }
+                        }
+
                         boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient,
fut));
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received incoming connection from remote node " +
+                                "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
+
+                        if (reserved) {
+                            try {
+                                GridTcpNioCommunicationClient client =
+                                    connected(recoveryDesc, ses, rmtNode, msg0.received(),
true, !hasShmemClient);
+
+                                fut.onDone(client);
+                            }
+                            finally {
+                                clientFuts.remove(rmtNode.id(), fut);
+                            }
+                        }
+                    }
+                    else {
+                        if (oldFut instanceof ConnectFuture && locNode.order() <
rmtNode.order()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received incoming connection from remote node
while " +
+                                    "connecting to this node, rejecting [locNode=" + locNode.id()
+
+                                    ", locNodeOrder=" + locNode.order() + ", rmtNode=" +
rmtNode.id() +
+                                    ", rmtNodeOrder=" + rmtNode.order() + ']');
+                            }
+
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+                        }
+                        else {
+                            // The code below causes a race condition between shmem and TCP
(see IGNITE-1294)
+                            boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
                                 new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient,
fut));
 
-                        if (reserved)
-                            connected(recoveryDesc, ses, rmtNode, msg0.received(), true,
false);
-//                    }
-//                }
+                            if (reserved)
+                                connected(recoveryDesc, ses, rmtNode, msg0.received(), true,
!hasShmemClient);
+                        }
+                    }
+                }
             }
 
             @Override public void onMessage(GridNioSession ses, Message msg) {
@@ -553,10 +568,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 else {
                     rcvdMsgsCnt.increment();
 
-                    GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+                    if (msg instanceof RecoveryLastReceivedMessage) {
+                        GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
 
-                    if (recovery != null) {
-                        if (msg instanceof RecoveryLastReceivedMessage) {
+                        if (recovery != null) {
                             RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
 
                             if (log.isDebugEnabled())
@@ -567,7 +582,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                             return;
                         }
-                        else {
+                    }
+                    else {
+                        GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
+
+                        if (recovery != null) {
                             long rcvCnt = recovery.onReceived();
 
                             if (rcvCnt % ackSndThreshold == 0) {
@@ -623,7 +642,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 boolean createClient) {
                 recovery.onHandshake(rcvCnt);
 
-                ses.recoveryDescriptor(recovery);
+                ses.inRecoveryDescriptor(recovery);
 
                 nioSrvr.resend(ses);
 
@@ -647,6 +666,87 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             /**
+             * @param recovery Recovery descriptor.
+             * @param ses Session.
+             * @param rcvCnt Number of received messages..
+             * @param sndRes If {@code true} sends response for recovery handshake.
+             */
+            private void connectedNew(
+                GridNioRecoveryDescriptor recovery,
+                GridNioSession ses,
+                long rcvCnt,
+                boolean sndRes) {
+                recovery.onHandshake(rcvCnt);
+
+                ses.inRecoveryDescriptor(recovery);
+
+                if (sndRes)
+                    nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+
+                recovery.connected();
+            }
+
+            /**
+             *
+             */
+            class ConnectClosureNew implements IgniteInClosure<Boolean> {
+                /** */
+                private static final long serialVersionUID = 0L;
+
+                /** */
+                private final GridNioSession ses;
+
+                /** */
+                private final GridNioRecoveryDescriptor recoveryDesc;
+
+                /** */
+                private final ClusterNode rmtNode;
+
+                /** */
+                private final HandshakeMessage msg;
+
+                /**
+                 * @param ses Incoming session.
+                 * @param recoveryDesc Recovery descriptor.
+                 * @param rmtNode Remote node.
+                 * @param msg Handshake message.
+                 */
+                ConnectClosureNew(GridNioSession ses,
+                    GridNioRecoveryDescriptor recoveryDesc,
+                    ClusterNode rmtNode,
+                    HandshakeMessage msg) {
+                    this.ses = ses;
+                    this.recoveryDesc = recoveryDesc;
+                    this.rmtNode = rmtNode;
+                    this.msg = msg;
+                }
+
+                /** {@inheritDoc} */
+                @Override public void apply(Boolean success) {
+                    if (success) {
+                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>()
{
+                            @Override public void apply(IgniteInternalFuture<?> msgFut)
{
+                                try {
+                                    msgFut.get();
+
+                                    connectedNew(recoveryDesc, ses, msg.received(), false);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to send recovery handshake " +
+                                            "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+                                    recoveryDesc.release();
+                                }
+                            }
+                        };
+
+                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                    }
+                }
+            }
+
+            /**
              *
              */
             @SuppressWarnings("PackageVisibleInnerClass")
@@ -867,6 +967,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** */
     private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs
= GridConcurrentFactory.newMap();
 
+    /** */
+    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> outRecDescs =
GridConcurrentFactory.newMap();
+
+    /** */
+    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> inRecDescs =
GridConcurrentFactory.newMap();
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -2364,7 +2470,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "(node left topology): " + node);
                     }
 
-                    GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node);
+                    GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node);
 
                     if (!recoveryDesc.reserve()) {
                         U.closeQuiet(ch);
@@ -2806,34 +2912,52 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             client.forceClose();
     }
 
+    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node) {
+        if (twoConnections(node))
+            return recoveryDescriptor(outRecDescs, node);
+        else
+            return recoveryDescriptor(recoveryDescs, node);
+    }
+
+    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node) {
+        if (twoConnections(node))
+            return recoveryDescriptor(inRecDescs, node);
+        else
+            return recoveryDescriptor(recoveryDescs, node);
+    }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if given node supports two connectios.
+     */
+    private boolean twoConnections(ClusterNode node) {
+        return TWO_CONN_SINCE_VER.compareToIgnoreTimestamp(node.version()) <= 0;
+    }
+
     /**
      * @param node Node.
      * @return Recovery receive data for given node.
      */
-    private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
+    private GridNioRecoveryDescriptor recoveryDescriptor(
+        ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs,
+        ClusterNode node) {
         ClientKey id = new ClientKey(node.id(), node.order());
 
         GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
 
-//        if (recovery == null) {
-//            int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
-//
-//            int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize *
5);
-//
-//            GridNioRecoveryDescriptor old =
-//                recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit,
node, log));
-//
-//            if (old != null)
-//                recovery = old;
-//        }
+        if (recovery == null) {
+            int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
 
-        int maxSize = Math.max(
-            msgQueueLimit,
-            ackSndThreshold);
+            int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
 
-        int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
+            GridNioRecoveryDescriptor old =
+                recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit,
node, log));
+
+            if (old != null)
+                recovery = old;
+        }
 
-        return new GridNioRecoveryDescriptor(queueLimit, node, log);
+        return recovery;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index 201fd27..58b91e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -369,12 +369,22 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc)
{
+        @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc)
{
             // No-op.
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+        @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc)
{
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 7bbf531..f210bec 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -370,7 +370,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
             Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv,
"sessions");
 
             for (GridNioSession ses : sessions) {
-                final GridNioRecoveryDescriptor snd = ses.recoveryDescriptor();
+                final GridNioRecoveryDescriptor snd = ses.outRecoveryDescriptor();
 
                 if (snd != null) {
                     GridTestUtils.waitForCondition(new GridAbsPredicate() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 34872c6..fb2dfd7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -173,7 +173,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                     boolean found = false;
 
                     for (GridNioSession ses : sessions) {
-                        final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+                        final GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
 
                         if (recoveryDesc != null) {
                             found = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/38965d3d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 25e3611..e153fe2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -187,7 +187,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends
Communic
                     boolean found = false;
 
                     for (GridNioSession ses : sessions) {
-                        final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+                        final GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
 
                         if (recoveryDesc != null) {
                             found = true;


Mime
View raw message