ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [32/50] [abbrv] ignite git commit: ignite-3054 review
Date Mon, 24 Apr 2017 15:03:55 GMT
ignite-3054 review


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a3838d57
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a3838d57
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a3838d57

Branch: refs/heads/ignite-3054
Commit: a3838d57c1e68535092b1503ac388c4944b0aa30
Parents: 0123742
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Dec 13 14:07:22 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Dec 13 14:07:22 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  9 +--
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 64 ++++++++++++--------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  1 +
 3 files changed, 45 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a3838d57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index fb68bde..305af3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -127,8 +127,7 @@ class ClientImpl extends TcpDiscoveryImpl {
     private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
 
     /** */
-    @SuppressWarnings("FieldCanBeLocal")
-    private static int DFLT_BYTE_ARR_STREAM_SIZE = 32 * 1024;
+    private static final int DFLT_BYTE_ARR_STREAM_SIZE = 32 * 1024;
 
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
@@ -650,8 +649,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         ", rmtNodeId=" + rmtNodeId + ']');
 
                 return new JoinResult(new SocketStream(sock),
-                    spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
-                    res.clientAck(), res.asyncMode());
+                    spi.readReceipt(sock,
+                    timeoutHelper.nextTimeoutChunk(ackTimeout0)),
+                    res.clientAck(),
+                    res.asyncMode());
             }
             catch (IOException | IgniteCheckedException e) {
                 U.closeQuiet(sock);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3838d57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b9a6b9d..d60d482 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -203,6 +203,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         IgniteProductVersion.fromString("1.5.0");
 
     /** Node ID in GridNioSession. */
+    // TODO: remove NODE_ID_META, use only NIO_WORKER_META.
     private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
 
     /** ClientNioMessageWorker in GridNioSession. */
@@ -299,6 +300,8 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** */
         private final ClientNioMessageProcessor msgProc = new ClientNioMessageProcessor(log);
 
+        // TODO: override onSessionWriteTimeout (see TcpCommSpi)
+
         @Override public void onConnected(final GridNioSession ses) {
             // No-op.
         }
@@ -306,9 +309,10 @@ class ServerImpl extends TcpDiscoveryImpl {
         @Override public void onDisconnected(final GridNioSession ses, @Nullable final Exception
e) {
             final UUID clientNodeId = msgProc.clientNodeId(ses);
 
-            if (log.isDebugEnabled())
+            if (log.isDebugEnabled()) {
                 log.debug("Stopping message worker on disconnect [remoteAddr=" + ses.remoteAddress()
+
                     ", remote node ID=" + clientNodeId + ']');
+            }
 
             final ClientNioMessageWorker proc = ses.meta(NIO_WORKER_META);
 
@@ -327,6 +331,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         msgProc.processMessage(ses, msg);
                     }
                     catch (IgniteCheckedException e) {
+                        // TODO: really close session.
                         log.error("Failure processing message, closing connection. [ses="
+ ses + ']', e);
 
                         final UUID nodeId = ses.meta(NODE_ID_META);
@@ -498,6 +503,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Respond to client without message length.
                 byte[] msg0 = (byte[])msg;
 
+                // TODO: allocate ByteBuffer once for all clients or get rid of ByteBuffer
creation.
                 ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length) : ByteBuffer.wrap(msg0);
 
                 if (directBuf) {
@@ -638,7 +644,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.join(msgWorker, log);
 
         for (ClientMessageProcessor clientWorker : clientMsgWorkers.values())
-            stopClientProcessor(clientWorker, true);
+            stopClientProcessor(clientWorker);
 
         clientNioSrv.stop();
 
@@ -706,9 +712,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
     /**
      * @param proc Client message processor.
-     * @throws IgniteSpiException
      */
-    private void stopClientProcessor(final ClientMessageProcessor proc, final boolean nioBlock)
throws IgniteSpiException {
+    private void stopClientProcessor(final ClientMessageProcessor proc) {
         if (proc == null)
             return;
 
@@ -722,14 +727,11 @@ class ServerImpl extends TcpDiscoveryImpl {
             else if (proc instanceof ClientNioMessageWorker) {
                 final ClientNioMessageWorker nioWrk = (ClientNioMessageWorker)proc;
 
-                if (nioBlock)
-                    nioWrk.stop();
-                else
-                    nioWrk.nonblockingStop();
+                nioWrk.stop();
             }
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteSpiException(e.getMessage(), e);
+            U.error(log, "Failed to stop client processor: " + proc, e);
         }
     }
 
@@ -1800,7 +1802,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.join(msgWorker, log);
 
         for (ClientMessageProcessor msgWorker : clientMsgWorkers.values())
-            stopClientProcessor(msgWorker, true);
+            stopClientProcessor(msgWorker);
 
         clientNioSrv.stop();
 
@@ -5695,11 +5697,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
                 catch (AlreadyBoundException | IOException e) {
+                    // TODO: always create new channel on IOException.
+
                     // On simultaneous nodes startup on the same JVM ServerSocketChannel.bind()
                     // may start throwing 'SocketException: Invalid argument' on each invocation,
                     // even if port is free.
                     if (e instanceof SocketException && reopenTries < REOPEN_SERVER_SOCKET_CHANNEL_TRIES
-                        && e.getMessage().contains("Invalid argument")) {
+                        && e.getMessage() != null && e.getMessage().contains("Invalid
argument")) {
                         if (log.isDebugEnabled())
                             log.debug("Caught SocketException try to reopen channel. " +
                                 "[port=" + port + ", localHost=" + spi.locHost + ']');
@@ -5977,6 +5981,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             final Map<Integer, Object> meta = new HashMap<>();
 
             meta.put(NODE_ID_META, clientNodeId());
+            meta.put(NIO_WORKER_META, this);
 
             final SocketChannel ch = sock.getChannel();
 
@@ -5989,8 +5994,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             ses = clientNioSrv.createSession(ch, meta).get();
 
-            ses.addMeta(NIO_WORKER_META, this);
-
             state = WorkerState.STARTED;
         }
 
@@ -6019,14 +6022,16 @@ class ServerImpl extends TcpDiscoveryImpl {
             final IgniteInternalFuture<Object> res = ses.close().chain(new C1<IgniteInternalFuture<Boolean>,
Object>() {
                 @Override public Object apply(final IgniteInternalFuture<Boolean> fut)
{
                     try {
-                        return fut.get();
+                        fut.get();
                     }
                     catch (IgniteCheckedException e) {
-                        throw new IgniteSpiException(e.getMessage(), e);
+                        U.warn(log, "Failed to close client session: " + e);
                     }
                     finally {
                         U.closeQuiet(sock);
                     }
+
+                    return null;
                 }
             });
 
@@ -6034,6 +6039,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             nioWorkers.remove(this);
 
+            // TODO: not needed.
             ses.removeMeta(NIO_WORKER_META);
 
             return res;
@@ -6067,6 +6073,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 nonblockingStop();
 
+                // TODO: move 'clientMsgWorkers.remove' to nonblockingStop.
                 clientMsgWorkers.remove(clientNodeId, this);
 
                 return new GridNioFinishedFuture<>(e);
@@ -6208,6 +6215,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             final ClientNioMessageWorker clientMsgWrk = ses.meta(NIO_WORKER_META);
 
+            // TODO assert clientMsgWrk != null;
+
             if (clientMsgWrk == null) {
                 if (log.isDebugEnabled())
                     log.debug("NIO Worker has been closed, drop message. [clientNodeId="
@@ -6229,6 +6238,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (debugMode && recordable(msg))
                 debugLog(msg, "Message has been received: " + msg);
 
+            // TODO: handle only messages client really can send.
             if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
                 clientMsgWrk.addReceipt(RES_OK);
 
@@ -6241,6 +6251,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     final TcpDiscoverySpiState state = spiStateCopy();
 
                     if (state == CONNECTED) {
+                        // TODO: future is not needed.
                         clientMsgWrk.addReceipt(RES_OK, new CX1<IgniteInternalFuture<?>,
Object>() {
                             private static final long serialVersionUID = 0L;
 
@@ -6636,6 +6647,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (nioOldWrk.state() == WorkerState.STOPPED)
                                     clientMsgWorkers.remove(nodeId, nioOldWrk);
                                 else {
+                                    // TODO: implement 'join' method for ClientNioMessageWorker.
                                     // check if old worker is stopping
                                     for (int i = 0; i < 5; i++) {
                                         U.sleep(100);
@@ -6668,17 +6680,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                         assert clientProc == clientMsgWorkers.get(nodeId);
 
                         clientMsgWrk = clientProc;
-
-                        nioClient = clientMsgWrk instanceof ClientNioMessageWorker;
                     }
 
-                    if (nioClient) {
+                    if (clientMsgWrk instanceof ClientNioMessageWorker) {
                         final ClientNioMessageWorker nioWrk = (ClientNioMessageWorker)clientMsgWrk;
 
                         nioWrk.start();
 
                         nioWrk.sendMessage(res, null);
 
+                        nioClient = true;
+
                         return;
                     }
 
@@ -7007,20 +7019,20 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
             finally {
+                if (nioClient)
+                    return;
+
                 if (clientMsgWrk != null) {
                     if (log.isDebugEnabled())
                         log.debug("Client connection failed [sock=" + sock + ", locNodeId="
+ locNodeId +
                             ", rmtNodeId=" + nodeId + ']');
 
-                    if (!nioClient) {
-                        clientMsgWorkers.remove(nodeId, clientMsgWrk);
+                    clientMsgWorkers.remove(nodeId, clientMsgWrk);
 
-                        U.interrupt((ClientMessageWorker)clientMsgWrk);
-                    }
+                    U.interrupt(clientMsgWrk);
                 }
 
-                if (!nioClient)
-                    U.closeQuiet(sock);
+                U.closeQuiet(sock);
             }
         }
 
@@ -7667,6 +7679,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             return delegate.getOOBInline();
         }
 
+        // TODO: check if synchronized is really needed.
         /** {@inheritDoc} */
         @Override public synchronized void setSoTimeout(final int timeout) throws SocketException
{
             delegate.setSoTimeout(timeout);
@@ -7727,6 +7740,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             return delegate.getReuseAddress();
         }
 
+        // TODO: check if synchronized is really needed.
         /** {@inheritDoc} */
         @Override public synchronized void close() throws IOException {
             U.closeQuiet(sslIn);
@@ -8032,7 +8046,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override public synchronized void write(final byte[] b, final int off, final int
len) throws IOException {
-            buf = expandBuffer(buf, len);
+            buf = expandBuffer(buf, len); // TODO: take 'off' into account when expand.
 
             buf.put(b, off, len);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3838d57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 3be12aa..9b2c8cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1487,6 +1487,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
         try {
             final SocketChannel ch = sock.getChannel();
 
+            // TODO: try remove this hack after all others issues are fixed.
             // Use channel directly as a workaround, because output stream
             // from NIO socket may block infinitely.
             if (ch != null && !(sock instanceof ServerImpl.NioSslSocket))


Mime
View raw message