ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [2/2] incubator-ignite git commit: # IGNITE-709 Implement reconnect.
Date Fri, 24 Apr 2015 17:39:52 GMT
# IGNITE-709 Implement reconnect.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fd778e61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fd778e61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fd778e61

Branch: refs/heads/ignite-709_2
Commit: fd778e617f7835b51f8879a86761ee8036e0a915
Parents: 65fc3e5
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Fri Apr 24 20:38:56 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Fri Apr 24 20:38:56 2015 +0300

----------------------------------------------------------------------
 .../discovery/tcp/TcpClientDiscoverySpi.java    | 1027 ++++++++++--------
 1 file changed, 591 insertions(+), 436 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd778e61/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 5f2de5f..ff0cb60 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -62,20 +61,32 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     /** Default disconnect check interval. */
     public static final long DFLT_DISCONNECT_CHECK_INT = 2000;
 
+    /** */
+    private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
+
+    /** */
+    private static final Object RECONNECT_TIMEOUT = "RECONNECT_TIMEOUT";
+
+    /** */
+    private static final Object SPI_STOP = "SPI_STOP";
+
+    /** */
+    private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
+
+    /** */
+    private static final Object SPI_RECONNECT_SUCCESS = "SPI_RECONNECT_SUCCESS";
+
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
 
-    /** Socket. */
-    private volatile Socket sock;
-
-    /** Socket reader. */
-    private volatile SocketReader sockRdr;
+    /** Socket writer. */
+    private SocketWriter sockWriter;
 
-    /** Heartbeat sender. */
-    private volatile HeartbeatSender hbSender;
+    /** */
+    private SocketReader sockReader;
 
-    /** Disconnect handler. */
-    private DisconnectHandler disconnectHnd;
+    /** */
+    private boolean segmentation;
 
     /** Last message ID. */
     private volatile IgniteUuid lastMsgId;
@@ -83,21 +94,24 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     /** Current topology version. */
     private volatile long topVer;
 
-    /** Join error. */
+    /** Join error. Contains error what occurs on join process. */
     private IgniteSpiException joinErr;
 
-    /** Whether reconnect failed. */
-    private boolean reconFailed;
-
     /** Joined latch. */
-    private CountDownLatch joinLatch;
+    private final CountDownLatch joinLatch = new CountDownLatch(1);
 
     /** Left latch. */
-    private volatile CountDownLatch leaveLatch;
+    private final CountDownLatch leaveLatch = new CountDownLatch(1);
 
     /** Disconnect check interval. */
     private long disconnectCheckInt = DFLT_DISCONNECT_CHECK_INT;
 
+    /** */
+    private final Timer timer = new Timer("TcpClientDiscoverySpi.timer");
+
+    /** */
+    private MessageWorker msgWorker;
+
     /** {@inheritDoc} */
     @Override public long getDisconnectCheckInterval() {
         return disconnectCheckInt;
@@ -145,9 +159,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
 
     /** {@inheritDoc} */
     @Override public int getMessageWorkerQueueSize() {
-        SocketReader sockRdr0 = sockRdr;
-
-        return sockRdr0 != null ? sockRdr0.msgWrk.queueSize() : 0;
+        return msgWorker.queueSize();
     }
 
     /** {@inheritDoc} */
@@ -267,13 +279,29 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
         locNode.setAttributes(locNodeAttrs);
         locNode.local(true);
 
+        sockWriter = new SocketWriter();
+        sockWriter.start();
+
+        sockReader = new SocketReader();
+        sockReader.start();
+
         sockTimeoutWorker = new SocketTimeoutWorker();
         sockTimeoutWorker.start();
 
-        joinTopology(false);
+        msgWorker = new MessageWorker();
+        msgWorker.start();
 
-        disconnectHnd = new DisconnectHandler();
-        disconnectHnd.start();
+        try {
+            joinLatch.await();
+
+            if (joinErr != null)
+                throw joinErr;
+        }
+        catch (InterruptedException e) {
+            throw new IgniteSpiException("Thread has been interrupted.", e);
+        }
+
+        timer.schedule(new HeartbeatSender(), hbFreq, hbFreq);
 
         if (log.isDebugEnabled())
             log.debug(startInfo());
@@ -281,48 +309,35 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
 
     /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
-        rmtNodes.clear();
+        timer.cancel();
 
-        U.interrupt(disconnectHnd);
-        U.join(disconnectHnd, log);
-
-        U.interrupt(hbSender);
-        U.join(hbSender, log);
-
-        Socket sock0 = sock;
-
-        sock = null;
-
-        if (sock0 != null) {
-            leaveLatch = new CountDownLatch(1);
+        if (msgWorker.isAlive()) { // Should always be alive
+            msgWorker.addMessage(SPI_STOP);
 
             try {
-                TcpDiscoveryNodeLeftMessage msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
-
-                msg.client(true);
+                if (!leaveLatch.await(10000, MILLISECONDS)) {
+                    System.out.println("leaveLatch Timeout!!!!");
 
-                writeToSocket(sock0, msg);
-
-                if (!U.await(leaveLatch, netTimeout, MILLISECONDS)) {
                     if (log.isDebugEnabled())
-                        log.debug("Did not receive node left message for local node (will stop anyway) [sock=" +
-                            sock0 + ']');
+                        U.error(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
                 }
             }
-            catch (IOException | IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock0 + ']', e);
-            }
-            finally {
-                U.closeQuiet(sock0);
+            catch (InterruptedException ignored) {
+
             }
         }
 
-        U.interrupt(sockRdr);
-        U.join(sockRdr, log);
+        rmtNodes.clear();
 
         U.interrupt(sockTimeoutWorker);
+        U.interrupt(msgWorker);
+        U.interrupt(sockWriter);
+        U.interrupt(sockReader);
+
+        U.join(msgWorker, log);
         U.join(sockTimeoutWorker, log);
+        U.join(sockWriter, log);
+        U.join(sockReader, log);
 
         unregisterMBean();
 
@@ -379,7 +394,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(Serializable evt) {
         try {
-            sockRdr.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt)));
+            sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt)));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -394,165 +409,105 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
             TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
                 node.id(), node.order());
 
-            sockRdr.addMessage(msg);
+            msgWorker.addMessage(msg);
         }
     }
 
     /**
-     * @param recon Reconnect flag.
-     * @return Whether joined successfully.
-     * @throws IgniteSpiException In case of error.
+     *
      */
-    private boolean joinTopology(boolean recon) throws IgniteSpiException {
-        if (!recon)
-            stats.onJoinStarted();
-
+    @NotNull
+    private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
         Collection<InetSocketAddress> addrs = null;
 
-        while (!Thread.currentThread().isInterrupted()) {
-            try {
-                while (addrs == null || addrs.isEmpty()) {
-                    addrs = resolvedAddresses();
+        while (true) {
+            if (Thread.currentThread().isInterrupted())
+                throw new InterruptedException();
 
-                    if (!F.isEmpty(addrs)) {
-                        if (log.isDebugEnabled())
-                            log.debug("Resolved addresses from IP finder: " + addrs);
-                    }
-                    else {
-                        U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder);
+            while (addrs == null || addrs.isEmpty()) {
+                addrs = resolvedAddresses();
 
-                        U.sleep(2000);
-                    }
+                if (!F.isEmpty(addrs)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Resolved addresses from IP finder: " + addrs);
                 }
+                else {
+                    U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder);
 
-                Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
-
-                Iterator<InetSocketAddress> it = addrs.iterator();
-
-                while (it.hasNext() && !Thread.currentThread().isInterrupted()) {
-                    InetSocketAddress addr = it.next();
-
-                    Socket sock = null;
-
-                    try {
-                        long ts = U.currentTimeMillis();
-
-                        IgniteBiTuple<Socket, UUID> t = initConnection(addr);
-
-                        sock = t.get1();
-
-                        UUID rmtNodeId = t.get2();
-
-                        stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
-
-                        locNode.clientRouterNodeId(rmtNodeId);
-
-                        TcpDiscoveryAbstractMessage msg = recon ?
-                            new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
-                                lastMsgId) :
-                            new TcpDiscoveryJoinRequestMessage(locNode, null);
-
-                        msg.client(true);
-
-                        writeToSocket(sock, msg);
-
-                        int res = readReceipt(sock, ackTimeout);
-
-                        switch (res) {
-                            case RES_OK:
-                                this.sock = sock;
-
-                                sockRdr = new SocketReader(rmtNodeId, new MessageWorker(recon));
-                                sockRdr.start();
-
-                                if (U.await(joinLatch, netTimeout, MILLISECONDS)) {
-                                    IgniteSpiException joinErr0 = joinErr;
-
-                                    if (joinErr0 != null)
-                                        throw joinErr0;
+                    Thread.sleep(2000);
+                }
+            }
 
-                                    if (reconFailed) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Failed to reconnect, will try to rejoin [locNode=" +
-                                                locNode + ']');
+            Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
 
-                                        U.closeQuiet(sock);
+            Iterator<InetSocketAddress> it = addrs.iterator();
 
-                                        U.interrupt(sockRdr);
-                                        U.join(sockRdr, log);
+            while (it.hasNext()) {
+                if (Thread.currentThread().isInterrupted())
+                    throw new InterruptedException();
 
-                                        this.sock = null;
+                InetSocketAddress addr = it.next();
 
-                                        return false;
-                                    }
+                Socket sock = null;
 
-                                    if (log.isDebugEnabled())
-                                        log.debug("Successfully connected to topology [sock=" + sock + ']');
+                try {
+                    long ts = U.currentTimeMillis();
 
-                                    hbSender = new HeartbeatSender();
-                                    hbSender.start();
+                    IgniteBiTuple<Socket, UUID> t = initConnection(addr);
 
-                                    stats.onJoinFinished();
+                    sock = t.get1();
 
-                                    return true;
-                                }
-                                else {
-                                    U.warn(log, "Join process timed out (will try other address) [sock=" + sock +
-                                        ", timeout=" + netTimeout + ']');
+                    UUID rmtNodeId = t.get2();
 
-                                    U.closeQuiet(sock);
+                    stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
 
-                                    U.interrupt(sockRdr);
-                                    U.join(sockRdr, log);
+                    locNode.clientRouterNodeId(rmtNodeId);
 
-                                    it.remove();
+                    TcpDiscoveryAbstractMessage msg = recon ?
+                        new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
+                            lastMsgId) :
+                        new TcpDiscoveryJoinRequestMessage(locNode, null);
 
-                                    break;
-                                }
+                    msg.client(true);
 
-                            case RES_CONTINUE_JOIN:
-                            case RES_WAIT:
-                                U.closeQuiet(sock);
+                    writeToSocket(sock, msg);
 
-                                break;
+                    int res = readReceipt(sock, ackTimeout);
 
-                            default:
-                                if (log.isDebugEnabled())
-                                    log.debug("Received unexpected response to join request: " + res);
+                    switch (res) {
+                        case RES_OK:
+                            return sock;
 
-                                U.closeQuiet(sock);
-                        }
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Joining thread was interrupted.");
+                        case RES_CONTINUE_JOIN:
+                        case RES_WAIT:
+                            U.closeQuiet(sock);
 
-                        return false;
-                    }
-                    catch (IOException | IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
-                            U.error(log, "Failed to establish connection with address: " + addr, e);
+                            break;
 
-                        U.closeQuiet(sock);
+                        default:
+                            if (log.isDebugEnabled())
+                                log.debug("Received unexpected response to join request: " + res);
 
-                        it.remove();
+                            U.closeQuiet(sock);
                     }
                 }
+                catch (IOException | IgniteCheckedException e) {
+                    if (log.isDebugEnabled())
+                        U.error(log, "Failed to establish connection with address: " + addr, e);
 
-                if (addrs.isEmpty()) {
-                    U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
-                        "in 2000ms): " + addrs0);
+                    U.closeQuiet(sock);
 
-                    U.sleep(2000);
+                    it.remove();
                 }
             }
-            catch (IgniteInterruptedCheckedException ignored) {
-                if (log.isDebugEnabled())
-                    log.debug("Joining thread was interrupted.");
+
+            if (addrs.isEmpty()) {
+                U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
+                    "in 2000ms): " + addrs0);
+
+                Thread.sleep(2000);
             }
         }
-
-        return false;
     }
 
     /**
@@ -564,8 +519,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException {
         assert addr != null;
 
-        joinLatch = new CountDownLatch(1);
-
         Socket sock = openSocket(addr);
 
         TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
@@ -590,300 +543,527 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     void simulateNodeFailure() {
         U.warn(log, "Simulating client node failure: " + getLocalNodeId());
 
-        U.closeQuiet(sock);
+        U.interrupt(sockWriter);
+        U.interrupt(msgWorker);
+        U.interrupt(sockTimeoutWorker);
+
+        U.join(sockWriter, log);
+        U.join(msgWorker, log);
+        U.join(sockTimeoutWorker, log);
+
+        timer.cancel();
+    }
 
-        U.interrupt(disconnectHnd);
-        U.join(disconnectHnd, log);
+    /**
+     * Heartbeat sender.
+     */
+    private class HeartbeatSender extends TimerTask {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            if (!getSpiContext().isStopping() && sockWriter.isOnline()) {
+                TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId());
 
-        U.interrupt(hbSender);
-        U.join(hbSender, log);
+                UUID nodeId = ignite.configuration().getNodeId();
 
-        U.interrupt(sockRdr);
-        U.join(sockRdr, log);
+                msg.setMetrics(nodeId, metricsProvider.metrics());
 
-        U.interrupt(sockTimeoutWorker);
-        U.join(sockTimeoutWorker, log);
+                msg.setCacheMetrics(nodeId, metricsProvider.cacheMetrics());
+
+                msg.client(true);
+
+                sockWriter.sendMessage(msg);
+            }
+        }
     }
 
     /**
-     * Disconnect handler.
+     * Socket reader.
      */
-    private class DisconnectHandler extends IgniteSpiThread {
+    private class SocketReader extends IgniteSpiThread {
+        /** */
+        private final Object mux = new Object();
+
+        /** */
+        private Socket sock;
+
+        /** */
+        private UUID rmtNodeId;
+
+        /**
+         */
+        protected SocketReader() {
+            super(gridName, "tcp-client-disco-sock-reader", log);
+        }
+
         /**
+         * @param sock Socket.
+         * @param rmtNodeId Rmt node id.
          */
-        protected DisconnectHandler() {
-            super(gridName, "tcp-client-disco-disconnect-hnd", log);
+        public void setSocket(Socket sock, UUID rmtNodeId) {
+            synchronized (mux) {
+                this.sock = sock;
+
+                this.rmtNodeId = rmtNodeId;
+
+                mux.notifyAll();
+            }
         }
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             while (!isInterrupted()) {
+                Socket sock;
+                UUID rmtNodeId;
+
+                synchronized (mux) {
+                    if (this.sock == null) {
+                        mux.wait();
+
+                        continue;
+                    }
+
+                    sock = this.sock;
+                    rmtNodeId = this.rmtNodeId;
+                }
+
                 try {
-                    U.sleep(disconnectCheckInt);
+                    try {
+                        InputStream in = new BufferedInputStream(sock.getInputStream());
 
-                    if (sock == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Node is disconnected from topology, will try to reconnect.");
+                        sock.setKeepAlive(true);
+                        sock.setTcpNoDelay(true);
+
+                        while (!isInterrupted()) {
+                            TcpDiscoveryAbstractMessage msg;
+
+                            try {
+                                msg = marsh.unmarshal(in, U.gridClassLoader());
 
-                        U.interrupt(hbSender);
-                        U.join(hbSender, log);
+                                System.out.println("TcpClientDiscoverySpi.SocketReader: read: " + msg);
+                            }
+                            catch (IgniteCheckedException e) {
+                                if (log.isDebugEnabled())
+                                    U.error(log, "Failed to read message [sock=" + sock + ", " +
+                                        "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);
+
+                                IOException ioEx = X.cause(e, IOException.class);
+
+                                if (ioEx != null)
+                                    throw ioEx;
+
+                                ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
+
+                                if (clsNotFoundEx != null)
+                                    LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
+                                        "(make sure same versions of all classes are available on all nodes) " +
+                                        "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
+                                else
+                                    LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" +
+                                        getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']');
 
-                        U.interrupt(sockRdr);
-                        U.join(sockRdr, log);
+                                continue;
+                            }
+
+                            msg.senderNodeId(rmtNodeId);
 
-                        // If reconnection fails, try to rejoin.
-                        if (!joinTopology(true)) {
-                            rmtNodes.clear();
+                            if (log.isDebugEnabled())
+                                log.debug("Message has been received: " + msg);
 
-                            locNode.order(0);
+                            stats.onMessageReceived(msg);
 
-                            joinTopology(false);
+                            if (ensured(msg))
+                                lastMsgId = msg.id();
 
-                            getSpiContext().recordEvent(new DiscoveryEvent(locNode,
-                                "Client node reconnected: " + locNode,
-                                EVT_CLIENT_NODE_RECONNECTED, locNode));
+                            msgWorker.addMessage(msg);
                         }
                     }
-                }
-                catch (IgniteInterruptedCheckedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Disconnect handler was interrupted.");
+                    catch (IOException e) {
+                        System.out.println("TcpClientDiscoverySpi.SocketReader: IOException: " + e);
 
-                    return;
+                        msgWorker.addMessage(new SocketClosedMessage(sock));
+
+                        if (log.isDebugEnabled())
+                            U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
+                    }
                 }
-                catch (IgniteSpiException e) {
-                    U.error(log, "Failed to reconnect to topology after failure.", e);
+                finally {
+                    System.out.println("TcpClientDiscoverySpi.SocketReader: Closed");
+
+                    U.closeQuiet(sock);
+
+                    synchronized (mux) {
+                        if (this.sock == sock) {
+                            this.sock = null;
+                            this.rmtNodeId = null;
+                        }
+                    }
                 }
             }
         }
     }
 
     /**
-     * Heartbeat sender.
+     *
      */
-    private class HeartbeatSender extends IgniteSpiThread {
+    private class SocketWriter extends IgniteSpiThread {
+        /** */
+        private final Object mux = new Object();
+
+        /** */
+        private Socket sock;
+
+        /** */
+        private final Queue<TcpDiscoveryAbstractMessage> queue = new LinkedList<>();
+
         /**
+         *
          */
-        protected HeartbeatSender() {
-            super(gridName, "tcp-client-disco-heartbeat-sender", log);
+        protected SocketWriter() {
+            super(gridName, "tcp-client-disco-sock-writer", log);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void sendMessage(TcpDiscoveryAbstractMessage msg) {
+            synchronized (mux) {
+                queue.add(msg);
+
+                mux.notifyAll();
+            }
+        }
+
+        /**
+         * @param sock Socket.
+         */
+        private void setSocket(Socket sock) {
+            synchronized (mux) {
+                this.sock = sock;
+
+                mux.notifyAll();
+            }
+        }
+
+        /**
+         *
+         */
+        public boolean isOnline() {
+            synchronized (mux) {
+                return sock != null;
+            }
         }
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            Socket sock0 = sock;
+            TcpDiscoveryAbstractMessage msg = null;
 
-            if (sock0 == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to start heartbeat sender, node is already disconnected.");
+            while (!Thread.currentThread().isInterrupted()) {
+                Socket sock;
 
-                return;
-            }
+                synchronized (mux) {
+                    sock = this.sock;
 
-            try {
-                while (!isInterrupted()) {
-                    U.sleep(hbFreq);
+                    if (sock == null) {
+                        mux.wait();
 
-                    TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId());
+                        continue;
+                    }
 
-                    msg.client(true);
+                    if (msg == null)
+                        msg = queue.poll();
+
+                    if (msg == null) {
+                        mux.wait();
 
-                    sockRdr.addMessage(msg);
+                        continue;
+                    }
+                }
+
+                try {
+                    writeToSocket(sock, msg);
+
+                    msg = null;
+                }
+                catch (IOException e) {
+                    System.out.println("TcpClientDiscoverSpi.SocketWriter: IOException: " + e);
+
+                    if (log.isDebugEnabled())
+                        U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock + ']', e);
+
+                    U.closeQuiet(sock);
+
+                    synchronized (mux) {
+                        if (sock == this.sock)
+                            this.sock = null; // Connection has dead.
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    System.out.println("TcpClientDiscoverSpi.SocketWriter: ICException: " + e);
+
+                    log.error("Failed to send message: " + msg, e);
+
+                    msg = null;
                 }
-            }
-            catch (IgniteInterruptedCheckedException ignored) {
-                if (log.isDebugEnabled())
-                    log.debug("Heartbeat sender was interrupted.");
             }
         }
     }
 
     /**
-     * Socket reader.
+     *
      */
-    private class SocketReader extends IgniteSpiThread {
-        /** Remote node ID. */
-        private final UUID nodeId;
+    private class Reconnector extends IgniteSpiThread {
+        /** */
+        private volatile Socket sock;
 
-        /** Message worker. */
-        private final MessageWorker msgWrk;
+        /** */
+        private Collection<TcpDiscoveryAbstractMessage> pendingMsg;
 
         /**
-         * @param nodeId Node ID.
-         * @param msgWrk Message worker.
+         *
          */
-        protected SocketReader(UUID nodeId, MessageWorker msgWrk) {
-            super(gridName, "tcp-client-disco-sock-reader", log);
-
-            assert nodeId != null;
-            assert msgWrk != null;
-
-            this.nodeId = nodeId;
-            this.msgWrk = msgWrk;
+        protected Reconnector() {
+            super(gridName, "tcp-client-disco-msg-worker", log);
         }
 
-        /** {@inheritDoc} */
-        @Override public synchronized void start() {
-            super.start();
+        /**
+         *
+         */
+        public void cancel() {
+            interrupt();
 
-            msgWrk.start();
+            U.closeQuiet(sock);
         }
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            Socket sock0 = sock;
+            assert !segmentation;
 
-            if (sock0 == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to start socket reader, node is already disconnected.");
+            Socket sock = null;
 
-                return;
+            try {
+                sock = joinTopology(false);
+            }
+            finally {
+                if (sock == null)
+                    msgWorker.addMessage(SPI_RECONNECT_FAILED);
             }
 
-            try {
-                InputStream in = new BufferedInputStream(sock0.getInputStream());
+            boolean success = false;
 
-                sock0.setKeepAlive(true);
-                sock0.setTcpNoDelay(true);
+            try {
+                this.sock = sock;
 
-                while (!isInterrupted()) {
-                    try {
-                        TcpDiscoveryAbstractMessage msg = marsh.unmarshal(in, U.gridClassLoader());
+                if (isInterrupted())
+                    throw new InterruptedException();
 
-                        msg.senderNodeId(nodeId);
+                InputStream in = new BufferedInputStream(sock.getInputStream());
 
-                        if (log.isDebugEnabled())
-                            log.debug("Message has been received: " + msg);
+                sock.setKeepAlive(true);
+                sock.setTcpNoDelay(true);
 
-                        stats.onMessageReceived(msg);
+                // Wait for
+                while (!isInterrupted()) {
+                    TcpDiscoveryAbstractMessage msg = marsh.unmarshal(in, U.gridClassLoader());
 
-                        if (joinLatch.getCount() > 0) {
-                            IgniteSpiException err = null;
+                    if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+                        TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
 
-                            if (msg instanceof TcpDiscoveryDuplicateIdMessage)
-                                err = duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
-                            else if (msg instanceof TcpDiscoveryAuthFailedMessage)
-                                err = authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
-                            else if (msg instanceof TcpDiscoveryCheckFailedMessage)
-                                err = checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+                        if (res.creatorNodeId().equals(getLocalNodeId())) {
+                            pendingMsg = res.pendingMessages();
 
-                            if (err != null) {
-                                joinErr = err;
+                            msgWorker.addMessage(SPI_RECONNECT_SUCCESS);
 
-                                joinLatch.countDown();
+                            success = true;
 
-                                return;
-                            }
+                            break;
                         }
-
-                        msgWrk.addMessage(msg);
                     }
-                    catch (IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
-                            U.error(log, "Failed to read message [sock=" + sock0 + ", " +
-                                "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + nodeId + ']', e);
-
-                        IOException ioEx = X.cause(e, IOException.class);
 
-                        if (ioEx != null)
-                            throw ioEx;
-
-                        ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
-
-                        if (clsNotFoundEx != null)
-                            LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
-                                "(make sure same versions of all classes are available on all nodes) " +
-                                "[rmtNodeId=" + nodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
-                        else
-                            LT.error(log, e, "Failed to read message [sock=" + sock0 + ", locNodeId=" +
-                                getLocalNodeId() + ", rmtNodeId=" + nodeId + ']');
-                    }
                 }
             }
-            catch (IOException e) {
-                if (log.isDebugEnabled())
-                    U.error(log, "Connection failed [sock=" + sock0 + ", locNodeId=" +
-                        getLocalNodeId() + ", rmtNodeId=" + nodeId + ']', e);
+            catch (IOException | IgniteCheckedException e) {
+                log.error("Failed to reconnect", e);
             }
             finally {
-                U.closeQuiet(sock0);
+                if (!success) {
+                    U.closeQuiet(sock);
 
-                U.interrupt(msgWrk);
-
-                try {
-                    U.join(msgWrk);
-                }
-                catch (IgniteInterruptedCheckedException ignored) {
-                    // No-op.
+                    msgWorker.addMessage(SPI_RECONNECT_FAILED);
                 }
-
-                sock = null;
             }
         }
-
-        /**
-         * @param msg Message.
-         */
-        void addMessage(TcpDiscoveryAbstractMessage msg) {
-            assert msg != null;
-
-            msgWrk.addMessage(msg);
-        }
     }
 
     /**
      * Message worker.
      */
-    private class MessageWorker extends MessageWorkerAdapter {
+    private class MessageWorker extends IgniteSpiThread {
         /** Topology history. */
         private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
 
-        /** Indicates that reconnection is in progress. */
-        private boolean recon;
+        /** Message queue. */
+        private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
+
+        /** */
+        private Socket currSock;
 
         /** Indicates that pending messages are currently processed. */
         private boolean pending;
 
+        /** */
+        private Reconnector reconnector;
+
         /**
-         * @param recon Whether reconnection is in progress.
+         *
          */
-        protected MessageWorker(boolean recon) {
-            super("tcp-client-disco-msg-worker");
+        private MessageWorker() {
+            super(gridName, "tcp-client-disco-msg-worker", log);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("InfiniteLoopStatement")
+        @Override protected void body() throws InterruptedException {
+            stats.onJoinStarted();
+
+            try {
+                final Socket sock = joinTopology(false);
+
+                currSock = sock;
+
+                sockWriter.setSocket(sock);
+
+                timer.schedule(new TimerTask() {
+                    @Override public void run() {
+                        if (joinLatch.getCount() > 0)
+                           queue.add(JOIN_TIMEOUT);
+                    }
+                }, netTimeout);
+
+                sockReader.setSocket(sock, locNode.clientRouterNodeId());
+
+                while (true) {
+                    Object msg = queue.take();
+
+                    System.out.println("TcpClientDiscoverySpi.MessageWorker: process: " + msg);
+
+                    if (msg == JOIN_TIMEOUT) {
+                        if (joinLatch.getCount() > 0) {
+                            joinErr = new IgniteSpiException("Join process timed out [sock=" + sock +
+                                ", timeout=" + netTimeout + ']');
+
+                            joinLatch.countDown();
+                        }
+                    }
+                    else if (msg == SPI_STOP) {
+                        assert getSpiContext().isStopping();
+
+                        if (currSock != null) {
+                            TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
+
+                            leftMsg.client(true);
+
+                            sockWriter.sendMessage(leftMsg);
+                        }
+                        else
+                            leaveLatch.countDown();
+                    }
+                    else if (msg instanceof SocketClosedMessage) {
+                        if (((SocketClosedMessage)msg).sock == currSock) {
+                            currSock = null;
+                            // todo
+
+                            if (joinLatch.getCount() > 0) {
+                                joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed.");
+
+                                joinLatch.countDown();
+                            }
+                            else {
+                                if (getSpiContext().isStopping() || segmentation)
+                                    leaveLatch.countDown();
+                                else {
+                                    assert reconnector == null;
+
+                                    reconnector = new Reconnector();
+                                    reconnector.start();
+
+                                    timer.schedule(new TimerTask() {
+                                        @Override public void run() {
+                                            msgWorker.addMessage(RECONNECT_TIMEOUT);
+                                        }
+                                    }, netTimeout);
+                                }
+                            }
+                        }
+                    }
+                    else if (msg == SPI_RECONNECT_FAILED) {
+                        segmentation = true;
+
+                        reconnector.cancel();
+                        reconnector.join();
+                    }
+                    else {
+                        TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
+
+                        if (joinLatch.getCount() > 0) {
+                            IgniteSpiException err = null;
+
+                            if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
+                                err = duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+                            else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage)
+                                err = authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+                            else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage)
+                                err = checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
 
-            this.recon = recon;
+                            if (err != null) {
+                                joinErr = err;
+
+                                joinLatch.countDown();
+
+                                continue;
+                            }
+                        }
+
+                        processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
+                    }
+                }
+            }
+            finally {
+                U.closeQuiet(currSock);
+
+                if (joinLatch.getCount() > 0) {
+                    // This should not occurs.
+                    joinErr = new IgniteSpiException("Some error occurs in joinig process");
+
+                    joinLatch.countDown();
+                }
+
+                if (reconnector != null) {
+                    reconnector.cancel();
+
+                    reconnector.join();
+                }
+            }
         }
 
         /** {@inheritDoc} */
-        @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+        protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
             assert msg != null;
             assert msg.verified() || msg.senderNodeId() == null;
 
             stats.onMessageProcessingStarted(msg);
 
-            if (msg instanceof TcpDiscoveryClientReconnectMessage)
-                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
-            else {
-                if (recon && !pending) {
-                    if (log.isDebugEnabled())
-                        log.debug("Discarding message received during reconnection: " + msg);
-                }
-                else {
-                    if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                        processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
-                    else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
-                        processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
-                    else if (msg instanceof TcpDiscoveryNodeLeftMessage)
-                        processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
-                    else if (msg instanceof TcpDiscoveryNodeFailedMessage)
-                        processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
-                    else if (msg instanceof TcpDiscoveryHeartbeatMessage)
-                        processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
-                    else if (msg instanceof TcpDiscoveryCustomEventMessage)
-                        processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
-
-                    if (ensured(msg))
-                        lastMsgId = msg.id();
-                }
-            }
+            if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
+            else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+                processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
+            else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+                processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
+            else if (msg instanceof TcpDiscoveryHeartbeatMessage)
+                processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+            else if (msg instanceof TcpDiscoveryCustomEventMessage)
+                processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
 
             stats.onMessageProcessingFinished(msg);
         }
@@ -892,7 +1072,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
          * @param msg Message.
          */
         private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
-            if (leaveLatch != null)
+            if (getSpiContext().isStopping())
                 return;
 
             TcpDiscoveryNode node = msg.node();
@@ -954,7 +1134,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
          * @param msg Message.
          */
         private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
-            if (leaveLatch != null)
+            if (getSpiContext().isStopping())
                 return;
 
             if (getLocalNodeId().equals(msg.nodeId())) {
@@ -968,6 +1148,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                     joinErr = null;
 
                     joinLatch.countDown();
+
+                    stats.onJoinFinished();
                 }
                 else if (log.isDebugEnabled())
                     log.debug("Discarding node add finished message (this message has already been processed) " +
@@ -1014,14 +1196,10 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                 if (log.isDebugEnabled())
                     log.debug("Received node left message for local node: " + msg);
 
-                CountDownLatch leaveLatch0 = leaveLatch;
-
-                assert leaveLatch0 != null;
-
-                leaveLatch0.countDown();
+                leaveLatch.countDown();
             }
             else {
-                if (leaveLatch != null)
+                if (getSpiContext().isStopping())
                     return;
 
                 TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
@@ -1052,8 +1230,17 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
          * @param msg Message.
          */
         private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
-            if (leaveLatch != null)
+            if (getSpiContext().isStopping()) {
+                if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
+                    if (leaveLatch.getCount() > 0) {
+                        log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
+                            + ", rmtNode=" + msg.creatorNodeId() + ']');
+
+                        leaveLatch.countDown();
+                    }
+                }
                 return;
+            }
 
             if (!getLocalNodeId().equals(msg.creatorNodeId())) {
                 TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
@@ -1084,42 +1271,13 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
          * @param msg Message.
          */
         private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
-            if (leaveLatch != null)
+            if (getSpiContext().isStopping())
                 return;
 
             if (getLocalNodeId().equals(msg.creatorNodeId())) {
-                if (msg.senderNodeId() == null) {
-                    Socket sock0 = sock;
-
-                    if (sock0 != null) {
-                        UUID nodeId = ignite.configuration().getNodeId();
-
-                        msg.setMetrics(nodeId, metricsProvider.metrics());
+                assert msg.senderNodeId() != null;
 
-                        msg.setCacheMetrics(nodeId, metricsProvider.cacheMetrics());
-
-                        try {
-                            writeToSocket(sock0, msg);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Heartbeat message sent [sock=" + sock0 + ", msg=" + msg + ']');
-                        }
-                        catch (IOException | IgniteCheckedException e) {
-                            if (log.isDebugEnabled())
-                                U.error(log, "Failed to send heartbeat message [sock=" + sock0 +
-                                    ", msg=" + msg + ']', e);
-
-                            U.closeQuiet(sock0);
-
-                            sock = null;
-
-                            interrupt();
-                        }
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Failed to send heartbeat message (node is disconnected): " + msg);
-                }
-                else if (log.isDebugEnabled())
+                if (log.isDebugEnabled())
                     log.debug("Received heartbeat response: " + msg);
             }
             else {
@@ -1147,35 +1305,30 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
          * @param msg Message.
          */
         private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
-            if (leaveLatch != null)
+            if (getSpiContext().isStopping())
                 return;
 
             if (getLocalNodeId().equals(msg.creatorNodeId())) {
+
+            }
+
+            if (getLocalNodeId().equals(msg.creatorNodeId())) {
                 if (msg.success()) {
                     pending = true;
 
                     try {
                         for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
-                            processMessage(pendingMsg);
+                            processDiscoveryMessage(pendingMsg);
                     }
                     finally {
                         pending = false;
                     }
-
-                    joinErr = null;
-                    reconFailed = false;
-
-                    joinLatch.countDown();
                 }
                 else {
-                    joinErr = null;
-                    reconFailed = true;
-
                     getSpiContext().recordEvent(new DiscoveryEvent(locNode,
                         "Client node disconnected: " + locNode,
-                        EVT_CLIENT_NODE_DISCONNECTED, locNode));
+                        EVT_NODE_SEGMENTED, locNode));
 
-                    joinLatch.countDown();
                 }
             }
             else if (log.isDebugEnabled())
@@ -1208,33 +1361,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                         log.debug("Received metrics from unknown node: " + nodeId);
                 }
             }
-            else {
-                if (getLocalNodeId().equals(msg.creatorNodeId())) {
-                    Socket sock0 = sock;
-
-                    if (sock0 != null) {
-                        try {
-                            writeToSocket(sock0, msg);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Heartbeat message sent [sock=" + sock0 + ", msg=" + msg + ']');
-                        }
-                        catch (IOException | IgniteCheckedException e) {
-                            if (log.isDebugEnabled())
-                                U.error(log, "Failed to send custom message [sock=" + sock0 +
-                                    ", msg=" + msg + ']', e);
-
-                            U.closeQuiet(sock0);
-
-                            sock = null;
-
-                            interrupt();
-                        }
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Failed to send custom message (node is disconnected): " + msg);
-                }
-            }
         }
 
         /**
@@ -1338,5 +1464,34 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                 log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
                     ", topVer=" + topVer + ']');
         }
+
+        /**
+         * @param msg Message.
+         */
+        public void addMessage(Object msg) {
+            queue.add(msg);
+        }
+
+        /**
+         *
+         */
+        public int queueSize() {
+            return queue.size();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SocketClosedMessage {
+        /** */
+        private final Socket sock;
+
+        /**
+         * @param sock Socket.
+         */
+        private SocketClosedMessage(Socket sock) {
+            this.sock = sock;
+        }
     }
 }


Mime
View raw message