ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [04/50] [abbrv] ignite git commit: IGNITE-3054 - Introduced nio message sender to release ring worker from marshalling.
Date Mon, 24 Apr 2017 15:03:27 GMT
IGNITE-3054 - Introduced nio message sender to release ring worker from marshalling.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/49ee23d0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/49ee23d0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/49ee23d0

Branch: refs/heads/ignite-3054
Commit: 49ee23d0496f2ccb83c5de849546d9c12142576f
Parents: 9287060
Author: dkarachentsev <dkarachentsev@gridgain.com>
Authored: Tue Nov 8 17:54:29 2016 +0300
Committer: dkarachentsev <dkarachentsev@gridgain.com>
Committed: Tue Nov 8 17:54:29 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 84 +++++++++++++++++++-
 1 file changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/49ee23d0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 66bc8f9..e6d6323 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -106,6 +106,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -279,6 +280,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Nio server that serves client connections. */
     private GridNioServer clientNioSrv;
 
+    /** Client NIO messages. */
+    private BlockingDeque<NioMessage> clientNioMsgQueue;
+
     /**
      * @param adapter Adapter.
      */
@@ -353,11 +357,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         tcpSrvr = new TcpServer();
 
+        clientNioMsgQueue = new LinkedBlockingDeque<>();
+
         clientNioSrv = createClientNioServer();
         clientNioSrv.start();
 
         nioClientProcessingPool = new IgniteThreadPoolExecutor(
-            "disco-client-nio-msg-processor", gridName, 0, 2, 60_000L, new LinkedBlockingQueue<Runnable>());
+            "disco-client-nio-msg-processor", gridName, spi.getClientNioThreads() * 2, spi.getClientNioThreads()
* 3,
+            60_000L, new LinkedBlockingQueue<Runnable>());
+
+        for (int i = 0; i < spi.getClientNioThreads(); i++)
+            nioClientProcessingPool.submit(new NioSendWorker(gridName, log, clientNioMsgQueue));
 
         spi.initLocalNode(tcpSrvr.port, true);
 
@@ -5685,7 +5695,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            sendMessage(msg, msgBytes);
+            if (msg.highPriority())
+                sendMessage(msg, msgBytes);
+            else
+                clientNioMsgQueue.add(new NioMessage(msg, msgBytes, this));
         }
 
         /**
@@ -6037,7 +6050,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             ack.verify(locNodeId);
 
-            clientMsgWrk.addMessage(ack);
+            clientMsgWrk.sendMessage(ack, null);
 
             if (heartbeatMsg != null)
                 clientMsgWrk.metrics(heartbeatMsg.metrics());
@@ -7953,4 +7966,69 @@ class ServerImpl extends TcpDiscoveryImpl {
                 buf.limit(oldLimit);
         }
     }
+
+    /**
+     *
+     */
+    private static class NioMessage {
+        /** */
+        private final TcpDiscoveryAbstractMessage msg;
+
+        /** */
+        private final byte[] msgData;
+
+        /** */
+        private final ClientNioMessageWorker worker;
+
+        /**
+         * @param msg Discovery message to send.
+         * @param msgData Message bytes to send.
+         * @param worker Client NIO worker.
+         */
+        NioMessage(final TcpDiscoveryAbstractMessage msg, final byte[] msgData,
+            final ClientNioMessageWorker worker) {
+            this.msg = msg;
+            this.msgData = msgData;
+            this.worker = worker;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(NioMessage.class, this);
+        }
+    }
+
+    /**
+     * Actually marshals client nio messages to release ring worker from that routine.
+     */
+    private static class NioSendWorker extends GridWorker {
+        /** */
+        private final BlockingDeque<NioMessage> msgQueue;
+
+        /**
+         * @param gridName Grid name.
+         * @param log Logger.
+         * @param queue Message queue.
+         */
+        NioSendWorker(@Nullable final String gridName, final IgniteLogger log,
+            final BlockingDeque<NioMessage> queue) {
+            super(gridName, "nio-client-sender", log);
+            msgQueue = queue;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException
{
+            while (!isCancelled()) {
+                final NioMessage msg = msgQueue.poll(2000, TimeUnit.MILLISECONDS);
+
+                try {
+                    if (msg != null)
+                        msg.worker.sendMessage(msg.msg, msg.msgData);
+                }
+                catch (Exception e) {
+                    log.error("Failed to send message to client: [nioMsg=" + msg + ']', e);
+                }
+            }
+        }
+    }
 }


Mime
View raw message