ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/27] ignite git commit: ignite-comm-balance
Date Fri, 23 Sep 2016 16:06:00 GMT
ignite-comm-balance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88a1ccb7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88a1ccb7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88a1ccb7

Branch: refs/heads/ignite-comm-balance
Commit: 88a1ccb7f61b8e18af3f85177c935212b1ef4568
Parents: 072f5a7
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Sep 23 19:00:44 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Sep 23 19:00:44 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     | 10 +++---
 .../ignite/internal/util/nio/GridNioServer.java | 34 ++++++++++++++++----
 .../communication/tcp/TcpCommunicationSpi.java  |  5 +--
 .../GridTcpCommunicationSpiConfigSelfTest.java  |  2 --
 ...GridTcpCommunicationSpiRecoverySelfTest.java |  3 +-
 5 files changed, 35 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/88a1ccb7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 33cd752..bd6ac5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -201,11 +201,6 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE2:
-                msg = new TcpCommunicationSpi.HandshakeMessage2();
-
-                break;
-
             case 0:
                 msg = new GridJobCancelRequest();
 
@@ -761,6 +756,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 125:
+                msg = new TcpCommunicationSpi.HandshakeMessage2();
+
+                break;
+
             // [-3..119] [124-125] - this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a1ccb7/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 83ed513e..8ad7bde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -45,7 +45,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -147,13 +146,13 @@ public class GridNioServer<T> {
     /** Flag indicating if this server should use direct buffers. */
     private final boolean directBuf;
 
-    /** Index to select which thread will serve next in socket channel. Using round-robin
balancing. */
+    /** Index to select which thread will serve next incoming socket channel. Using round-robin
balancing. */
     @GridToStringExclude
-    private final AtomicInteger readBalanceIdx = new AtomicInteger();
+    private int readBalanceIdx;
 
     /** Index to select which thread will serve next out socket channel. Using round-robin
balancing. */
     @GridToStringExclude
-    private final AtomicInteger writeBalanceIdx = new AtomicInteger(1);
+    private int writeBalanceIdx = 1;
 
     /** Tcp no delay flag. */
     private final boolean tcpNoDelay;
@@ -691,9 +690,32 @@ public class GridNioServer<T> {
         assert req.operation() == NioOperation.REGISTER : req;
         assert req.socketChannel() != null : req;
 
-        int balanceIdx = req.accepted() ? readBalanceIdx.getAndAdd(2) : writeBalanceIdx.getAndAdd(2);
+        int workes = clientWorkers.size();
 
-        clientWorkers.get(balanceIdx & (clientWorkers.size() - 1)).offer(req);
+        int balanceIdx;
+
+        if (workes > 1) {
+            if (req.accepted()) {
+                balanceIdx = readBalanceIdx;
+
+                readBalanceIdx += 2;
+
+                if (readBalanceIdx >= workes)
+                    readBalanceIdx = 0;
+            }
+            else {
+                balanceIdx = writeBalanceIdx;
+
+                writeBalanceIdx += 2;
+
+                if (writeBalanceIdx >= workes)
+                    writeBalanceIdx = 1;
+            }
+        }
+        else
+            balanceIdx = 0;
+
+        clientWorkers.get(balanceIdx).offer(req);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a1ccb7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index d3f5bfb..efb29e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -333,9 +333,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final byte HANDSHAKE_MSG_TYPE = -3;
 
     /** */
-    public static final byte HANDSHAKE_MSG_TYPE2 = 125;
-
-    /** */
     private ConnectGateway connectGate;
 
     /** */
@@ -3975,7 +3972,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         /** {@inheritDoc} */
         @Override public byte directType() {
-            return HANDSHAKE_MSG_TYPE2;
+            return 125;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a1ccb7/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index c84ee32..5345a9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.spi.communication.tcp;
 
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractConfigTest;
 import org.apache.ignite.testframework.junits.spi.GridSpiTest;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a1ccb7/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 3d33fff..91014dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -451,8 +451,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
                     // Wait when session is closed, then try to open new connection from
node1.
                     GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                        @Override
-                        public boolean apply() {
+                        @Override public boolean apply() {
                             return ses1.closeTime() != 0;
                         }
                     }, awaitForSocketWriteTimeout());


Mime
View raw message