ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [35/50] [abbrv] ignite git commit: io - ack opts
Date Mon, 10 Oct 2016 14:57:40 GMT
io - ack opts


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc6bae81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc6bae81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc6bae81

Branch: refs/heads/ignite-gg-8-io2-park
Commit: dc6bae819851b40e992f76a508c81c4e882d2f80
Parents: 27762e9
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 3 12:08:24 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 3 13:11:30 2016 +0300

----------------------------------------------------------------------
 .../rest/protocols/tcp/MockNioSession.java      |   5 +
 .../managers/communication/GridIoManager.java   |  10 -
 .../ignite/internal/util/nio/GridNioServer.java | 253 +++++++++++--------
 .../internal/util/nio/GridNioSession.java       |   2 +
 .../internal/util/nio/GridNioSessionImpl.java   |   5 +
 .../util/nio/GridSelectorNioSessionImpl.java    |  20 ++
 .../communication/tcp/TcpCommunicationSpi.java  |   3 +-
 .../nio/impl/GridNioFilterChainSelfTest.java    |   5 +
 8 files changed, 193 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dc6bae81/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
index e848653..4c9fe77 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
@@ -149,4 +149,9 @@ public class MockNioSession extends GridMetadataAwareAdapter implements
GridNioS
     @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public void systemMessage(Object msg) {
+        // No-op.
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc6bae81/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index e055f45..bfed190 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -750,16 +750,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 return;
             }
 
-            // Check discovery.
-            ClusterNode node = ctx.discovery().node(nodeId);
-
-            if (node == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Ignoring message from dead node [senderId=" + nodeId + ",
msg=" + msg + ']');
-
-                return; // We can't receive messages from non-discovered ones.
-            }
-
             if (msg.topic() == null) {
                 int topicOrd = msg.topicOrdinal();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc6bae81/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 1c4599c..97d676a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -973,6 +973,12 @@ public class GridNioServer<T> {
                     readBuf.compact();
                 else
                     readBuf.clear();
+
+                if (ses.hasSystemMessage() && !ses.procWrite.get()) {
+                    ses.procWrite.set(true);
+
+                    registerWrite(ses);
+                }
             }
             catch (IgniteCheckedException e) {
                 close(ses, e);
@@ -1049,21 +1055,25 @@ public class GridNioServer<T> {
 
                 while (true) {
                     if (req == null) {
-                        req = (NioOperationFuture<?>)ses.pollFuture();
+                        req = systemMessage(ses);
 
-                        if (req == null && buf.position() == 0) {
-                            if (ses.procWrite.get()) {
-                                boolean set = ses.procWrite.compareAndSet(true, false);
+                        if (req == null) {
+                            req = (NioOperationFuture<?>)ses.pollFuture();
 
-                                assert set;
+                            if (req == null && buf.position() == 0) {
+                                if (ses.procWrite.get()) {
+                                    boolean set = ses.procWrite.compareAndSet(true, false);
 
-                                if (ses.writeQueue().isEmpty())
-                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-                                else
-                                    ses.procWrite.set(true);
-                            }
+                                    assert set;
 
-                            break;
+                                    if (ses.writeQueue().isEmpty())
+                                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                                    else
+                                        ses.procWrite.set(true);
+                                }
+
+                                break;
+                            }
                         }
                     }
 
@@ -1088,7 +1098,10 @@ public class GridNioServer<T> {
                     while (finished) {
                         req.onDone();
 
-                        req = (NioOperationFuture<?>)ses.pollFuture();
+                        req = systemMessage(ses);
+
+                        if (req == null)
+                            req = (NioOperationFuture<?>)ses.pollFuture();
 
                         if (req == null)
                             break;
@@ -1197,6 +1210,27 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param ses Session.
+         * @return System message request.
+         */
+        private NioOperationFuture<?> systemMessage(GridSelectorNioSessionImpl ses)
{
+            if (ses.hasSystemMessage()) {
+                Object msg = ses.systemMessage();
+
+                NioOperationFuture req = new NioOperationFuture<>(ses,
+                    NioOperation.REQUIRE_WRITE,
+                    (Message)msg,
+                    true);
+
+                assert !ses.hasSystemMessage();
+
+                return req;
+            }
+
+            return null;
+        }
+
+        /**
          * Processes write-ready event on the key.
          *
          * @param key Key that is ready to be written.
@@ -1222,21 +1256,25 @@ public class GridNioServer<T> {
             }
 
             if (req == null) {
-                req = (NioOperationFuture<?>)ses.pollFuture();
+                req = systemMessage(ses);
 
-                if (req == null && buf.position() == 0) {
-                    if (ses.procWrite.get()) {
-                        boolean set = ses.procWrite.compareAndSet(true, false);
+                if (req == null) {
+                    req = (NioOperationFuture<?>)ses.pollFuture();
 
-                        assert set;
+                    if (req == null && buf.position() == 0) {
+                        if (ses.procWrite.get()) {
+                            boolean set = ses.procWrite.compareAndSet(true, false);
 
-                        if (ses.writeQueue().isEmpty())
-                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-                        else
-                            ses.procWrite.set(true);
-                    }
+                            assert set;
 
-                    return;
+                            if (ses.writeQueue().isEmpty())
+                                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                            else
+                                ses.procWrite.set(true);
+                        }
+
+                        return;
+                    }
                 }
             }
 
@@ -1261,7 +1299,10 @@ public class GridNioServer<T> {
             while (finished) {
                 req.onDone();
 
-                req = (NioOperationFuture<?>)ses.pollFuture();
+                req = systemMessage(ses);
+
+                if (req == null)
+                    req = (NioOperationFuture<?>)ses.pollFuture();
 
                 if (req == null)
                     break;
@@ -1455,15 +1496,7 @@ public class GridNioServer<T> {
                             }
 
                             case REQUIRE_WRITE: {
-                                //Just register write key.
-                                SelectionKey key = req.session().key();
-
-                                if (key.isValid()) {
-                                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-
-                                    // Update timestamp to protected against false write
timeout.
-                                    ((GridNioSessionImpl)key.attachment()).bytesSent(0);
-                                }
+                                registerWrite(req.session());
 
                                 break;
                             }
@@ -1514,72 +1547,7 @@ public class GridNioServer<T> {
                             }
 
                             case DUMP_STATS: {
-                                StringBuilder sb = new StringBuilder();
-
-                                Set<SelectionKey> keys = selector.keys();
-
-                                sb.append(U.nl())
-                                    .append(">> Selector info [idx=").append(idx)
-                                    .append(", keysCnt=").append(keys.size())
-                                    .append("]").append(U.nl());
-
-                                for (SelectionKey key : keys) {
-                                    GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                                    MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-                                    MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
-
-                                    sb.append("    Connection info [")
-                                        .append("in=").append(ses.accepted())
-                                        .append(", rmtAddr=").append(ses.remoteAddress())
-                                        .append(", locAddr=").append(ses.localAddress());
-
-                                    GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
-
-                                    if (outDesc != null) {
-                                        sb.append(", msgsSent=").append(outDesc.sent())
-                                            .append(", msgsAckedByRmt=").append(outDesc.acked())
-                                            .append(", descIdHash=").append(System.identityHashCode(outDesc));
-                                    }
-                                    else
-                                        sb.append(", outRecoveryDesc=null");
-
-                                    GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
-
-                                    if (inDesc != null) {
-                                        sb.append(", msgsRcvd=").append(inDesc.received())
-                                            .append(", lastAcked=").append(inDesc.lastAcknowledged())
-                                            .append(", descIdHash=").append(System.identityHashCode(inDesc));
-                                    }
-                                    else
-                                        sb.append(", inRecoveryDesc=null");
-
-                                    sb.append(", bytesRcvd=").append(ses.bytesReceived())
-                                        .append(", bytesSent=").append(ses.bytesSent())
-                                        .append(", opQueueSize=").append(ses.writeQueueSize())
-                                        .append(", msgWriter=").append(writer != null ? writer.toString()
: "null")
-                                        .append(", msgReader=").append(reader != null ? reader.toString()
: "null");
-
-                                    int cnt = 0;
-
-                                    for (GridNioFuture<?> fut : ses.writeQueue()) {
-                                        if (cnt == 0)
-                                            sb.append(",\n opQueue=[").append(fut);
-                                        else
-                                            sb.append(',').append(fut);
-
-                                        if (++cnt == 5) {
-                                            sb.append(']');
-
-                                            break;
-                                        }
-                                    }
-
-
-                                    sb.append("]").append(U.nl());
-                                }
-
-                                U.warn(log, sb.toString());
+                                dumpStats();
 
                                 // Complete the request just in case (none should wait on
this future).
                                 req.onDone(true);
@@ -1634,6 +1602,93 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param ses Session.
+         */
+        final void registerWrite(GridSelectorNioSessionImpl ses) {
+            SelectionKey key = ses.key();
+
+            if (key.isValid()) {
+                if ((key.interestOps() & SelectionKey.OP_WRITE) == 0)
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+
+                // Update timestamp to protected against false write timeout.
+                ses.bytesSent(0);
+            }
+        }
+
+        /**
+         *
+         */
+        private void dumpStats() {
+            StringBuilder sb = new StringBuilder();
+
+            Set<SelectionKey> keys = selector.keys();
+
+            sb.append(U.nl())
+                .append(">> Selector info [idx=").append(idx)
+                .append(", keysCnt=").append(keys.size())
+                .append("]").append(U.nl());
+
+            for (SelectionKey key : keys) {
+                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+
+                sb.append("    Connection info [")
+                    .append("in=").append(ses.accepted())
+                    .append(", rmtAddr=").append(ses.remoteAddress())
+                    .append(", locAddr=").append(ses.localAddress());
+
+                GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+
+                if (outDesc != null) {
+                    sb.append(", msgsSent=").append(outDesc.sent())
+                        .append(", msgsAckedByRmt=").append(outDesc.acked())
+                        .append(", descIdHash=").append(System.identityHashCode(outDesc));
+                }
+                else
+                    sb.append(", outRecoveryDesc=null");
+
+                GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+
+                if (inDesc != null) {
+                    sb.append(", msgsRcvd=").append(inDesc.received())
+                        .append(", lastAcked=").append(inDesc.lastAcknowledged())
+                        .append(", descIdHash=").append(System.identityHashCode(inDesc));
+                }
+                else
+                    sb.append(", inRecoveryDesc=null");
+
+                sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                    .append(", bytesSent=").append(ses.bytesSent())
+                    .append(", opQueueSize=").append(ses.writeQueueSize())
+                    .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+                    .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+
+                int cnt = 0;
+
+                for (GridNioFuture<?> fut : ses.writeQueue()) {
+                    if (cnt == 0)
+                        sb.append(",\n opQueue=[").append(fut);
+                    else
+                        sb.append(',').append(fut);
+
+                    if (++cnt == 5) {
+                        sb.append(']');
+
+                        break;
+                    }
+                }
+
+
+                sb.append("]").append(U.nl());
+            }
+
+            U.warn(log, sb.toString());
+        }
+
+        /**
          * Processes keys selected by a selector.
          *
          * @param keys Selected keys.

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc6bae81/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index 1e427d8..bd60e52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -174,4 +174,6 @@ public interface GridNioSession {
      * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
      */
     @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor();
+
+    public void systemMessage(Object msg);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc6bae81/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 53a624d..04c47b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -316,6 +316,11 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
+    @Override public void systemMessage(Object msg) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioSessionImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc6bae81/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 6b00281..8c6996c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -68,6 +68,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** */
     final AtomicBoolean procWrite = new AtomicBoolean();
 
+    /** */
+    private Object sysMsg;
+
     /**
      * Creates session instance.
      *
@@ -321,6 +324,23 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void systemMessage(Object sysMsg) {
+        this.sysMsg = sysMsg;
+    }
+
+    boolean hasSystemMessage() {
+        return sysMsg != null;
+    }
+
+    public Object systemMessage() {
+        Object ret = sysMsg;
+
+        sysMsg = null;
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc6bae81/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 79949d2..bda2fcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -633,7 +633,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                         ", rcvCnt=" + rcvCnt + ']');
                                 }
 
-                                nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
+                                //nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
+                                ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
 
                                 recovery.lastAcknowledged(rcvCnt);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc6bae81/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index 58b91e4..ab6ee5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -387,5 +387,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest
{
         @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
             return null;
         }
+
+        /** {@inheritDoc} */
+        @Override public void systemMessage(Object msg) {
+            // No-op.
+        }
     }
 }
\ No newline at end of file


Mime
View raw message