ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [3/5] ignite git commit: nio balance
Date Thu, 15 Sep 2016 15:58:52 GMT
nio balance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc85af16
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc85af16
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc85af16

Branch: refs/heads/ignite-comm-balance
Commit: bc85af1680acca18f6d764e09ca2ecacb48c2319
Parents: 0b7ff82
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Sep 15 15:12:50 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Sep 15 15:12:50 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       |  12 +
 .../ignite/internal/util/nio/GridNioServer.java | 361 ++++++++++++++++++-
 .../internal/util/nio/GridNioSessionImpl.java   |  50 +++
 .../util/nio/GridSelectorNioSessionImpl.java    |   9 +-
 .../IgniteCommunicationBalanceTest.java         | 118 ++++++
 5 files changed, 536 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 93acc75..c7529bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8307,6 +8307,18 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Gets absolute value for long. If argument is {@link Long#MIN_VALUE}, then {@code 0}
is returned.
+     *
+     * @param i Argument.
+     * @return Absolute value.
+     */
+    public static long safeAbs(long i) {
+        i = Math.abs(i);
+
+        return i < 0 ? 0 : i;
+    }
+
+    /**
      * Gets wrapper class for a primitive type.
      *
      * @param cls Class. If {@code null}, method is no-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index f18615d..60db1fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -107,9 +107,6 @@ public class GridNioServer<T> {
     private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
 
     /** */
-    private static final boolean NIO_SES_BALANCE_ENABLED = IgniteSystemProperties.getBoolean("IGNITE_NIO_SES_BALANCE_ENABLED",
true);
-
-    /** */
     private static final boolean DISABLE_KEYSET_OPTIMIZATION =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
 
@@ -213,6 +210,15 @@ public class GridNioServer<T> {
     /** Optional listener to monitor outbound message queue size. */
     private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
 
+    /** */
+    private volatile long writerMoveCnt;
+
+    /** */
+    private volatile long readerMoveCnt;
+
+    /** */
+    private final Balancer balancer;
+
     /**
      * @param addr Address.
      * @param port Port.
@@ -324,6 +330,22 @@ public class GridNioServer<T> {
         this.writerFactory = writerFactory;
 
         this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse();
+
+        boolean balanceEnabled = IgniteSystemProperties.getBoolean("IGNITE_NIO_SES_BALANCE_ENABLED",
true);
+
+        Balancer balancer0 = null;
+
+        if (balanceEnabled) {
+            String balancerCls = IgniteSystemProperties.getString("IGNITE_NIO_SES_BALANCER_CLASS_NAME");
+
+            if (balancerCls != null) {
+
+            }
+            else
+                balancer0 = new SizeBasedBalancer(this);
+        }
+
+        this.balancer = balancer0;
     }
 
     /**
@@ -399,7 +421,10 @@ public class GridNioServer<T> {
 
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        int idx = impl.selectorIndex(); // TODO
+
+        if (idx != -1)
+            clientWorkers.get(idx).offer(fut);
 
         return fut;
     }
@@ -459,9 +484,13 @@ public class GridNioServer<T> {
             if (ses.removeFuture(fut))
                 fut.connectionClosed();
         }
-        else if (msgCnt == 1)
+        else if (msgCnt == 1) {
             // Change from 0 to 1 means that worker thread should be waken up.
-            clientWorkers.get(ses.selectorIndex()).offer(fut);
+            int idx = ses.selectorIndex();
+
+            if (idx != -1) // TODO revisit
+                clientWorkers.get(idx).offer(fut);
+        }
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -925,6 +954,8 @@ public class GridNioServer<T> {
                 metricsLsnr.onBytesReceived(cnt);
 
             ses.bytesReceived(cnt);
+            ses.onBytesRead(cnt, readBuf.capacity());
+            onRead(cnt);
 
             readBuf.flip();
 
@@ -1239,6 +1270,8 @@ public class GridNioServer<T> {
                     metricsLsnr.onBytesSent(cnt);
 
                 ses.bytesSent(cnt);
+                ses.onBytesWritten(cnt, buf.capacity());
+                onWrite(cnt);
             }
             else {
                 // For test purposes only (skipWrite is set to true in tests only).
@@ -1276,6 +1309,13 @@ public class GridNioServer<T> {
         /** Worker index. */
         private final int idx;
 
+        private volatile long bytesRcvd;
+        private volatile long bytesSent;
+        private volatile long bytesRcvd0;
+        private volatile long bytesSent0;
+
+        private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions0 =
new GridConcurrentHashSet<>();
+
         /**
          * @param idx Index of this worker in server's array.
          * @param gridName Grid name.
@@ -1400,6 +1440,40 @@ public class GridNioServer<T> {
                                 break;
                             }
 
+                            case MOVE: {
+                                SessionMoveFuture f = (SessionMoveFuture)req;
+
+                                GridSelectorNioSessionImpl ses = f.session();
+
+                                if (idx == f.toIdx) {
+                                    ses.selectorIndex(idx);
+
+                                    sessions0.add(ses);
+
+                                    SelectionKey key = f.socketChannel().register(selector,
+                                        SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses);
// TODO what if reads were paused?
+
+                                    ses.key(key);
+                                }
+                                else {
+                                    assert ses.selectorIndex() == idx; // TODO replace with
IF and ignore?
+
+                                    // Cleanup.
+                                    ses.selectorIndex(-1);
+                                    sessions0.remove(ses);
+
+                                    SelectionKey key = ses.key();
+
+                                    f.socketChannel((SocketChannel)key.channel());
+
+                                    key.cancel();
+
+                                    clientWorkers.get(f.toIndex()).offer(f);
+                                }
+
+                                break;
+                            }
+
                             case REQUIRE_WRITE: {
                                 //Just register write key.
                                 SelectionKey key = req.session().key();
@@ -1467,6 +1541,10 @@ public class GridNioServer<T> {
                                 sb.append(U.nl())
                                     .append(">> Selector info [idx=").append(idx)
                                     .append(", keysCnt=").append(keys.size())
+                                    .append(", bytesRcvd=").append(bytesRcvd)
+                                    .append(", bytesRcvd0=").append(bytesRcvd0)
+                                    .append(", bytesSent=").append(bytesSent)
+                                    .append(", bytesSent0=").append(bytesSent0)
                                     .append("]").append(U.nl());
 
                                 for (SelectionKey key : keys) {
@@ -1500,8 +1578,12 @@ public class GridNioServer<T> {
                                         sb.append(", inRecoveryDesc=null");
 
                                     sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                                        .append(", bytesRcvd0=").append(ses.bytesReceived0())
                                         .append(", bytesSent=").append(ses.bytesSent())
+                                        .append(", bytesSent0=").append(ses.bytesSent0())
                                         .append(", opQueueSize=").append(ses.writeQueueSize())
+                                        .append(", writeStats=").append(Arrays.toString(ses.writeStats()))
+                                        .append(", readStats=").append(Arrays.toString(ses.readStats()))
                                         .append(", msgWriter=").append(writer != null ? writer.toString()
: "null")
                                         .append(", msgReader=").append(reader != null ? reader.toString()
: "null");
 
@@ -1764,6 +1846,7 @@ public class GridNioServer<T> {
                     resend(ses);
 
                 sessions.add(ses);
+                sessions0.add(ses);
 
                 try {
                     filterChain.onSessionOpened(ses);
@@ -1789,7 +1872,7 @@ public class GridNioServer<T> {
         }
 
         /**
-         * Closes the ses and all associated resources, then notifies the listener.
+         * Closes the session and all associated resources, then notifies the listener.
          *
          * @param ses Session to be closed.
          * @param e Exception to be passed to the listener, if any.
@@ -1806,12 +1889,10 @@ public class GridNioServer<T> {
             }
 
             sessions.remove(ses);
+            sessions0.remove(ses);
 
             SelectionKey key = ses.key();
 
-            // Shutdown input and output so that remote client will see correct socket close.
-            Socket sock = ((SocketChannel)key.channel()).socket();
-
             if (ses.setClosed()) {
                 ses.onClosed();
 
@@ -1823,6 +1904,9 @@ public class GridNioServer<T> {
                         ((DirectBuffer)ses.readBuffer()).cleaner().clean();
                 }
 
+                // Shutdown input and output so that remote client will see correct socket
close.
+                Socket sock = ((SocketChannel)key.channel()).socket();
+
                 try {
                     try {
                         sock.shutdownInput();
@@ -1906,6 +1990,24 @@ public class GridNioServer<T> {
          * @throws IOException If write failed.
          */
         protected abstract void processWrite(SelectionKey key) throws IOException;
+
+        protected void onRead(int cnt) { // TODO
+            bytesRcvd += cnt;
+            bytesRcvd0 += cnt;
+        }
+
+        protected void onWrite(int cnt) {
+            bytesSent += cnt;
+            bytesSent0 += cnt;
+        }
+
+        protected void reset0() {
+            bytesSent0 = 0;
+            bytesRcvd0 = 0;
+
+            for (GridSelectorNioSessionImpl ses : sessions0)
+                ses.reset0();
+        }
     }
 
     /**
@@ -1976,12 +2078,17 @@ public class GridNioServer<T> {
          * @throws IgniteCheckedException If failed.
          */
         private void accept() throws IgniteCheckedException {
+            long lastBalance = U.currentTimeMillis();
+
             try {
                 while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted())
{
                     // Wake up every 2 seconds to check if closed.
                     if (selector.select(2000) > 0)
                         // Walk through the ready keys collection and process date requests.
                         processSelectedKeys(selector.selectedKeys());
+
+                    if (balancer != null)
+                        balancer.balance();
                 }
             }
             // Ignore this exception as thread interruption is equal to 'close' call.
@@ -2082,6 +2189,9 @@ public class GridNioServer<T> {
         /** Register read key selection. */
         REGISTER,
 
+        /** */
+        MOVE,
+
         /** Register write key selection. */
         REQUIRE_WRITE,
 
@@ -2107,7 +2217,7 @@ public class GridNioServer<T> {
 
         /** Socket channel in register request. */
         @GridToStringExclude
-        private SocketChannel sockCh;
+        protected SocketChannel sockCh; // TODO to be fixed with proper hierarchy
 
         /** Session to perform operation on. */
         @GridToStringExclude
@@ -2249,14 +2359,14 @@ public class GridNioServer<T> {
         /**
          * @return Socket channel for register request.
          */
-        private SocketChannel socketChannel() {
+        SocketChannel socketChannel() {
             return sockCh;
         }
 
         /**
          * @return Session for this change request.
          */
-        private GridSelectorNioSessionImpl session() {
+        GridSelectorNioSessionImpl session() {
             return ses;
         }
 
@@ -2303,6 +2413,41 @@ public class GridNioServer<T> {
     }
 
     /**
+     *
+     */
+    private static class SessionMoveFuture<R> extends NioOperationFuture<R> {
+        /** */
+        private final int toIdx;
+
+        /**
+         * @param ses
+         * @param toIdx
+         */
+        public SessionMoveFuture(
+            GridSelectorNioSessionImpl ses,
+            int toIdx
+        ) {
+            super(ses, NioOperation.MOVE);
+
+            this.sockCh = sockCh;
+            this.toIdx = toIdx;
+        }
+
+        int toIndex() {
+            return toIdx;
+        }
+
+        void socketChannel(SocketChannel sockCh) {
+            this.sockCh = sockCh;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SessionMoveFuture.class, this, super.toString());
+        }
+    }
+
+    /**
      * Filter forwarding messages from chain's head to this server.
      */
     private class HeadFilter extends GridNioFilterAdapter {
@@ -2708,4 +2853,194 @@ public class GridNioServer<T> {
             return this;
         }
     }
+
+    /**
+     *
+     */
+    public interface Balancer {
+        /**
+         *
+         */
+        void balance();
+    }
+
+    /**
+     *
+     */
+    private static class SizeBasedBalancer implements Balancer {
+        /** */
+        private final GridNioServer<?> srv;
+
+        /** */
+        private final IgniteLogger log;
+
+        /** */
+        private long lastBalance;
+
+        /**
+         * @param srv Server.
+         */
+        public SizeBasedBalancer(GridNioServer<?> srv) {
+            this.srv = srv;
+
+            log = srv.log;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void balance() {
+            long now = U.currentTimeMillis();
+
+            if (lastBalance + 5000 < now) {
+                lastBalance = now;
+
+                long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1;
+                int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1;
+
+                boolean print = Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4");
+
+                List<GridNioServer.AbstractNioClientWorker> clientWorkers = (List)srv.clientWorkers;
+
+                for (int i = 0; i < clientWorkers.size(); i++) {
+                    GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                    if ((i & 1) == 0) {
+                        // Reader.
+                        long bytesRcvd0 = worker.bytesRcvd0;
+
+                        if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0
> 0 &&
+                            worker.sessions0.size() > 1) {
+                            maxRcvd0 = bytesRcvd0;
+                            maxRcvdIdx = i;
+
+                            continue;
+                        }
+
+                        if (minRcvd0 == -1 || bytesRcvd0 < minRcvd0) {
+                            minRcvd0 = bytesRcvd0;
+                            minRcvdIdx = i;
+                        }
+                    }
+                    else {
+                        // Writer.
+                        long bytesSent0 = worker.bytesSent0;
+
+                        if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0
> 0 &&
+                            worker.sessions0.size() > 1) {
+                            maxSent0 = bytesSent0;
+                            maxSentIdx = i;
+
+                            continue;
+                        }
+
+                        if (minSent0 == -1 || bytesSent0 < minSent0) {
+                            minSent0 = bytesSent0;
+                            minSentIdx = i;
+                        }
+                    }
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" +
minSentIdx +
+                        ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx +
+                        ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx +
+                        ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']');
+
+                if (print)
+                    log.info("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx
+
+                        ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx +
+                        ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx +
+                        ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']');
+
+                if (maxSent0 != -1 && minSent0 != -1) {
+                    GridSelectorNioSessionImpl ses = null;
+
+                    long sentDiff = maxSent0 - minSent0;
+                    long delta = sentDiff;
+                    double threshold = sentDiff * 0.9;
+
+                    GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+                        clientWorkers.get(maxSentIdx).sessions0;
+
+                    for (GridSelectorNioSessionImpl ses0 : sessions) {
+                        long bytesSent0 = ses0.bytesSent0();
+
+                        if (bytesSent0 < threshold &&
+                            (ses == null || delta > U.safeAbs(bytesSent0 - sentDiff /
2))) {
+                            ses = ses0;
+                            delta = U.safeAbs(bytesSent0 - sentDiff / 2);
+                        }
+                    }
+
+                    if (ses != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Will move session to less loaded writer [ses=" + ses
+
+                                ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+                        if (print)
+                            log.info("Will move session to less loaded writer [diff=" + sentDiff
+ ", ses=" + ses +
+                                ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+                        srv.writerMoveCnt++;
+
+                        clientWorkers.get(maxSentIdx).offer(new SessionMoveFuture(ses, minSentIdx));
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Unable to find session to move for writers.");
+
+                        if (print)
+                            log.info("Unable to find session to move for writers.");
+                    }
+                }
+
+                if (maxRcvd0 != -1 && minRcvd0 != -1) {
+                    GridSelectorNioSessionImpl ses = null;
+
+                    long rcvdDiff = maxRcvd0 - minRcvd0;
+                    long delta = rcvdDiff;
+                    double threshold = rcvdDiff * 0.9;
+
+                    GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+                        clientWorkers.get(maxRcvdIdx).sessions0;
+
+                    for (GridSelectorNioSessionImpl ses0 : sessions) {
+                        long bytesRcvd0 = ses0.bytesReceived0();
+
+                        if (bytesRcvd0 < threshold &&
+                            (ses == null || delta > U.safeAbs(bytesRcvd0 - rcvdDiff /
2))) {
+                            ses = ses0;
+                            delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2);
+                        }
+                    }
+
+                    if (ses != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Will move session to less loaded reader [ses=" + ses
+
+                                ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']');
+
+                        if (print)
+                            log.info("Will move session to less loaded reader [diff=" + rcvdDiff
+ ", ses=" + ses +
+                                ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+                        srv.readerMoveCnt++;
+
+                        clientWorkers.get(maxRcvdIdx).offer(new SessionMoveFuture(ses, minRcvdIdx));
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Unable to find session to move for readers.");
+
+                        if (print)
+                            log.info("Unable to find session to move for readers.");
+                    }
+                }
+
+                for (int i = 0; i < clientWorkers.size(); i++) {
+                    GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                    worker.reset0();
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 53a624d..3f5d367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -45,12 +45,21 @@ public class GridNioSessionImpl implements GridNioSession {
     /** Session close timestamp. */
     private final AtomicLong closeTime = new AtomicLong();
 
+    private final long[] writesStat = new long[25];
+    private final long[] readsStat = new long[25];
+
     /** Sent bytes counter. */
     private volatile long bytesSent;
 
     /** Received bytes counter. */
     private volatile long bytesRcvd;
 
+    /** Sent bytes counter since last re-balancing. */
+    private volatile long bytesSent0;
+
+    /** Received bytes counter since last re-balancing. */
+    private volatile long bytesRcvd0;
+
     /** Last send schedule timestamp. */
     private volatile long sndSchedTime;
 
@@ -163,6 +172,19 @@ public class GridNioSessionImpl implements GridNioSession {
         return bytesRcvd;
     }
 
+    public long bytesSent0() {
+        return bytesSent0;
+    }
+
+    public long bytesReceived0() {
+        return bytesRcvd0;
+    }
+
+    public void reset0() {
+        bytesSent0 = 0;
+        bytesRcvd0 = 0;
+    }
+
     /** {@inheritDoc} */
     @Override public long createTime() {
         return createTime;
@@ -240,10 +262,37 @@ public class GridNioSessionImpl implements GridNioSession {
      */
     public void bytesSent(int cnt) {
         bytesSent += cnt;
+        bytesSent0 += cnt;
 
         lastSndTime = U.currentTimeMillis();
     }
 
+    public void onBytesWritten(int cnt, int bufCap) {
+        int idx = (int)Math.floor(((cnt * 1.0) / bufCap) * writesStat.length);
+
+        if (idx >= writesStat.length)
+            idx = writesStat.length - 1;
+
+        writesStat[idx]++;
+    }
+
+    public void onBytesRead(int cnt, int bufCap) {
+        int idx = (int)Math.floor(((cnt * 1.0) / bufCap) * readsStat.length);
+
+        if (idx >= readsStat.length)
+            idx = readsStat.length - 1;
+
+        readsStat[idx]++;
+    }
+
+    public long[] readStats() {
+        return readsStat;
+    }
+
+    public long[] writeStats() {
+        return writesStat;
+    }
+
     /**
      * Adds given amount ob bytes to the received bytes counter.
      * <p>
@@ -253,6 +302,7 @@ public class GridNioSessionImpl implements GridNioSession {
      */
     public void bytesReceived(int cnt) {
         bytesRcvd += cnt;
+        bytesRcvd0 += cnt;
 
         lastRcvTime = U.currentTimeMillis();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index a680a33..8e5b93d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -44,7 +44,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     private SelectionKey key;
 
     /** Worker index for server */
-    private final int selectorIdx;
+    private volatile int selectorIdx;
 
     /** Size counter. */
     private final AtomicInteger queueSize = new AtomicInteger();
@@ -161,6 +161,13 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /**
+     * @param selectorIdx Selector index.
+     */
+    void selectorIndex(int selectorIdx) {
+        this.selectorIdx = selectorIdx;
+    }
+
+    /**
      * Adds write future at the front of the queue without acquiring back pressure semaphore.
      *
      * @param writeFut Write request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
new file mode 100644
index 0000000..30a6254
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.log4j.helpers.ThreadLocalMap;
+
+/**
+ *
+ */
+public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi());
+
+        commSpi.setSharedMemoryPort(-1);
+        commSpi.setSelectorsCount(4);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBalance() throws Exception {
+        startGrid(0);
+
+        client = true;
+
+        Ignite client = startGrid(4);
+
+        startGridsMultiThreaded(1, 3);
+
+        for (int i = 0; i < 4; i++) {
+            ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
+
+            client.compute(client.cluster().forNode(node)).run(new DummyRunnable());
+        }
+
+//        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+//
+//        for (int iter = 0; iter < 10; iter++) {
+//            log.info("Iteration: " + iter);
+//
+//            int nodeIdx = rnd.nextInt(4);
+//
+//            ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id());
+//
+//            for (int i = 0; i < 10_000; i++)
+//                client.compute(client.cluster().forNode(node)).run(new DummyRunnable());
+//
+//            U.sleep(5000);
+//        }
+
+        while (true) {
+            ((IgniteKernal) client).dumpDebugInfo();
+
+            Thread.sleep(5000);
+        }
+
+        //Thread.sleep(Long.MAX_VALUE);
+    }
+
+    /**
+     *
+     */
+    static class DummyRunnable implements IgniteRunnable {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            // No-op.
+        }
+    }
+}


Mime
View raw message