ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/7] ignite git commit: IO opts : added simple worker rebalancing + limited default selectors to 8
Date Fri, 09 Sep 2016 14:01:52 GMT
IO opts : added simple worker rebalancing + limited default selectors to 8


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9113dd8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9113dd8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9113dd8

Branch: refs/heads/ignite-comm-opts1
Commit: c9113dd8edbce931cb1f86586d0cfceb2bba7fb1
Parents: 9d3cc2d
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Thu Sep 8 15:30:47 2016 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Thu Sep 8 15:30:47 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       |  12 +
 .../ignite/internal/util/nio/GridNioServer.java | 278 ++++++++++++++++++-
 .../internal/util/nio/GridNioSessionImpl.java   |  27 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  11 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   2 +-
 5 files changed, 314 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c9113dd8/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e3389d5..f8e1d86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8297,6 +8297,18 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Gets absolute value for long. If argument is {@link Long#MIN_VALUE}, then {@code 0}
is returned.
+     *
+     * @param i Argument.
+     * @return Absolute value.
+     */
+    public static long safeAbs(long i) {
+        i = Math.abs(i);
+
+        return i < 0 ? 0 : i;
+    }
+
+    /**
      * Gets wrapper class for a primitive type.
      *
      * @param cls Class. If {@code null}, method is no-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9113dd8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index c67619e..0b94730 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -108,6 +108,7 @@ public class GridNioServer<T> {
     // TODO
     private static final int WRITE_BUF_SIZE = IgniteSystemProperties.getInteger("IGNITE_WRITE_BUF_SIZE",
65536);
     private static final int READ_BUF_SIZE = IgniteSystemProperties.getInteger("IGNITE_READ_BUF_SIZE",
65536);
+    private static final boolean NIO_SES_BALANCE_ENABLED = IgniteSystemProperties.getBoolean("IGNITE_NIO_SES_BALANCE_ENABLED",
true);
 
     /** */
     private static final boolean DISABLE_KEYSET_OPTIMIZATION =
@@ -399,7 +400,10 @@ public class GridNioServer<T> {
 
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        int idx = impl.selectorIndex(); // TODO
+
+        if (idx != -1)
+            clientWorkers.get(idx).offer(fut);
 
         return fut;
     }
@@ -459,9 +463,13 @@ public class GridNioServer<T> {
             if (ses.removeFuture(fut))
                 fut.connectionClosed();
         }
-        else if (msgCnt == 1)
+        else if (msgCnt == 1) {
             // Change from 0 to 1 means that worker thread should be waken up.
-            clientWorkers.get(ses.selectorIndex()).offer(fut);
+            int idx = ses.selectorIndex();
+
+            if (idx != 1) // TODO revisit
+                clientWorkers.get(idx).offer(fut);
+        }
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -921,6 +929,7 @@ public class GridNioServer<T> {
 
             ses.bytesReceived(cnt);
             ses.onBytesRead(cnt, readBuf.capacity());
+            onRead(cnt);
 
             readBuf.flip();
 
@@ -1236,6 +1245,7 @@ public class GridNioServer<T> {
 
                 ses.bytesSent(cnt);
                 ses.onBytesWritten(cnt, buf.capacity());
+                onWrite(cnt);
             }
             else {
                 // For test purposes only (skipWrite is set to true in tests only).
@@ -1273,6 +1283,13 @@ public class GridNioServer<T> {
         /** Worker index. */
         private final int idx;
 
+        private volatile long bytesRcvd;
+        private volatile long bytesSent;
+        private volatile long bytesRcvd0;
+        private volatile long bytesSent0;
+
+        private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions0 =
new GridConcurrentHashSet<>();
+
         /**
          * @param idx Index of this worker in server's array.
          * @param gridName Grid name.
@@ -1397,6 +1414,40 @@ public class GridNioServer<T> {
                                 break;
                             }
 
+                            case MOVE: {
+                                SessionMoveFuture f = (SessionMoveFuture)req;
+
+                                GridSelectorNioSessionImpl ses = f.session();
+
+                                if (idx == f.toIdx) {
+                                    ses.selectorIndex(idx);
+
+                                    sessions0.add(ses);
+
+                                    SelectionKey key = f.socketChannel().register(selector,
+                                        SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses);
// TODO what if reads were paused?
+
+                                    ses.key(key);
+                                }
+                                else {
+                                    assert ses.selectorIndex() == idx; // TODO replace with
IF and ignore?
+
+                                    // Cleanup.
+                                    ses.selectorIndex(-1);
+                                    sessions0.remove(ses);
+
+                                    SelectionKey key = ses.key();
+
+                                    f.socketChannel((SocketChannel)key.channel());
+
+                                    key.cancel();
+
+                                    clientWorkers.get(f.toIndex()).offer(f);
+                                }
+
+                                break;
+                            }
+
                             case REQUIRE_WRITE: {
                                 //Just register write key.
                                 SelectionKey key = req.session().key();
@@ -1464,6 +1515,10 @@ public class GridNioServer<T> {
                                 sb.append(U.nl())
                                     .append(">> Selector info [idx=").append(idx)
                                     .append(", keysCnt=").append(keys.size())
+                                    .append(", bytesRcvd=").append(bytesRcvd)
+                                    .append(", bytesRcvd0=").append(bytesRcvd0)
+                                    .append(", bytesSent=").append(bytesSent)
+                                    .append(", bytesSent0=").append(bytesSent0)
                                     .append("]").append(U.nl());
 
                                 for (SelectionKey key : keys) {
@@ -1488,7 +1543,9 @@ public class GridNioServer<T> {
                                         sb.append(", recoveryDesc=null");
 
                                     sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                                        .append(", bytesRcvd0=").append(ses.bytesReceived0())
                                         .append(", bytesSent=").append(ses.bytesSent())
+                                        .append(", bytesSent0=").append(ses.bytesSent0())
                                         .append(", opQueueSize=").append(ses.writeQueueSize())
                                         .append(", writeStats=").append(Arrays.toString(ses.writeStats()))
                                         .append(", readStats=").append(Arrays.toString(ses.readStats()))
@@ -1754,6 +1811,7 @@ public class GridNioServer<T> {
                     resend(ses);
 
                 sessions.add(ses);
+                sessions0.add(ses);
 
                 try {
                     filterChain.onSessionOpened(ses);
@@ -1779,7 +1837,7 @@ public class GridNioServer<T> {
         }
 
         /**
-         * Closes the ses and all associated resources, then notifies the listener.
+         * Closes the session and all associated resources, then notifies the listener.
          *
          * @param ses Session to be closed.
          * @param e Exception to be passed to the listener, if any.
@@ -1796,12 +1854,10 @@ public class GridNioServer<T> {
             }
 
             sessions.remove(ses);
+            sessions0.remove(ses);
 
             SelectionKey key = ses.key();
 
-            // Shutdown input and output so that remote client will see correct socket close.
-            Socket sock = ((SocketChannel)key.channel()).socket();
-
             if (ses.setClosed()) {
                 ses.onClosed();
 
@@ -1813,6 +1869,9 @@ public class GridNioServer<T> {
                         ((DirectBuffer)ses.readBuffer()).cleaner().clean();
                 }
 
+                // Shutdown input and output so that remote client will see correct socket
close.
+                Socket sock = ((SocketChannel)key.channel()).socket();
+
                 try {
                     try {
                         sock.shutdownInput();
@@ -1891,6 +1950,24 @@ public class GridNioServer<T> {
          * @throws IOException If write failed.
          */
         protected abstract void processWrite(SelectionKey key) throws IOException;
+
+        protected void onRead(int cnt) { // TODO
+            bytesRcvd += cnt;
+            bytesRcvd0 += cnt;
+        }
+
+        protected void onWrite(int cnt) {
+            bytesSent += cnt;
+            bytesSent0 += cnt;
+        }
+
+        protected void reset0() {
+            bytesSent0 = 0;
+            bytesRcvd0 = 0;
+
+            for (GridSelectorNioSessionImpl ses : sessions0)
+                ses.reset0();
+        }
     }
 
     /**
@@ -1961,12 +2038,155 @@ public class GridNioServer<T> {
          * @throws IgniteCheckedException If failed.
          */
         private void accept() throws IgniteCheckedException {
+            long lastBalance = U.currentTimeMillis();
+
             try {
                 while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted())
{
                     // Wake up every 2 seconds to check if closed.
                     if (selector.select(2000) > 0)
                         // Walk through the ready keys collection and process date requests.
                         processSelectedKeys(selector.selectedKeys());
+
+                    if (!NIO_SES_BALANCE_ENABLED)
+                        continue;
+
+                    long now = U.currentTimeMillis();
+
+                    if (lastBalance + 5000 < now) {
+                        lastBalance = now;
+
+                        long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1;
+                        int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx
= -1;
+
+                        if(sessions.size() > 2)
+                            System.out.println("");
+
+                        for (int i = 0; i < clientWorkers.size(); i++) {
+                            AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                            if ((i & 1) == 0) {
+                                // Reader.
+                                long bytesRcvd0 = worker.bytesRcvd0;
+
+                                if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) &&
bytesRcvd0 > 0 &&
+                                    worker.sessions0.size() > 1) {
+                                    maxRcvd0 = bytesRcvd0;
+                                    maxRcvdIdx = i;
+
+                                    continue;
+                                }
+
+                                if (minRcvd0 == -1 || bytesRcvd0 < minRcvd0) {
+                                    minRcvd0 = bytesRcvd0;
+                                    minRcvdIdx = i;
+                                }
+                            }
+                            else {
+                                // Writer.
+                                long bytesSent0 = worker.bytesSent0;
+
+                                if ((maxSent0 == -1 || bytesSent0 > maxSent0) &&
bytesSent0 > 0 &&
+                                    worker.sessions0.size() > 1) {
+                                    maxSent0 = bytesSent0;
+                                    maxSentIdx = i;
+
+                                    continue;
+                                }
+
+                                if (minSent0 == -1 || bytesSent0 < minSent0) {
+                                    minSent0 = bytesSent0;
+                                    minSentIdx = i;
+                                }
+                            }
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx="
+ minSentIdx +
+                                ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx +
+                                ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx +
+                                ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx +
']');
+
+                        U.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx="
+ minSentIdx +
+                                ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx +
+                                ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx +
+                                ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx +
']');
+
+                        if (maxSent0 != -1 && minSent0 != -1) {
+                            GridSelectorNioSessionImpl ses = null;
+
+                            long sentDiff = maxSent0 - minSent0;
+                            long delta = sentDiff;
+                            double threshold = sentDiff * 0.9;
+
+                            for (GridSelectorNioSessionImpl ses0 : clientWorkers.get(maxSentIdx).sessions0)
{
+                                long bytesSent0 = ses0.bytesSent0();
+
+                                if (bytesSent0 < threshold &&
+                                    (ses == null || delta > U.safeAbs(bytesSent0 - sentDiff
/ 2))) {
+                                    ses = ses0;
+                                    delta = U.safeAbs(bytesSent0 - sentDiff / 2);
+                                }
+                            }
+
+                            if (ses != null) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Will move session to less loaded writer [ses="
+ ses +
+                                        ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+                                U.debug(log, "Will move session to less loaded writer [ses="
+ ses +
+                                    ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+                            }
+                            else {
+                                if (log.isDebugEnabled())
+                                    log.debug("Unable to find session to move for writers.");
+
+                                U.debug(log, "Unable to find session to move for writers.");
+                            }
+
+                            clientWorkers.get(maxSentIdx).offer(new SessionMoveFuture(ses,
minSentIdx));
+                        }
+
+                        if (maxRcvd0 != -1 && minRcvd0 != -1) {
+                            GridSelectorNioSessionImpl ses = null;
+
+                            long rcvdDiff = maxRcvd0 - minRcvd0;
+                            long delta = rcvdDiff;
+                            double threshold = rcvdDiff * 0.9;
+
+                            for (GridSelectorNioSessionImpl ses0 : clientWorkers.get(maxRcvdIdx).sessions0)
{
+                                long bytesRcvd0 = ses0.bytesReceived0();
+
+                                if (bytesRcvd0 < threshold &&
+                                    (ses == null || delta > U.safeAbs(bytesRcvd0 - rcvdDiff
/ 2))) {
+                                    ses = ses0;
+                                    delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2);
+                                }
+                            }
+
+                            if (ses != null) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Will move session to less loaded reader [ses="
+ ses +
+                                        ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']');
+
+                                U.debug(log, "Will move session to less loaded reader [ses="
+ ses +
+                                    ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+                            }
+                            else {
+                                if (log.isDebugEnabled())
+                                    log.debug("Unable to find session to move for readers.");
+
+                                U.debug(log, "Unable to find session to move for readers.");
+                            }
+
+                            clientWorkers.get(maxRcvdIdx).offer(new SessionMoveFuture(ses,
minRcvdIdx));
+                        }
+
+                        for (int i = 0; i < clientWorkers.size(); i++) {
+                            AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                            worker.reset0();
+                        }
+                    }
                 }
             }
             // Ignore this exception as thread interruption is equal to 'close' call.
@@ -2067,6 +2287,9 @@ public class GridNioServer<T> {
         /** Register read key selection. */
         REGISTER,
 
+        /** */
+        MOVE,
+
         /** Register write key selection. */
         REQUIRE_WRITE,
 
@@ -2092,7 +2315,7 @@ public class GridNioServer<T> {
 
         /** Socket channel in register request. */
         @GridToStringExclude
-        private SocketChannel sockCh;
+        protected SocketChannel sockCh; // TODO to be fixed with proper hierarchy
 
         /** Session to perform operation on. */
         @GridToStringExclude
@@ -2234,14 +2457,14 @@ public class GridNioServer<T> {
         /**
          * @return Socket channel for register request.
          */
-        private SocketChannel socketChannel() {
+        SocketChannel socketChannel() {
             return sockCh;
         }
 
         /**
          * @return Session for this change request.
          */
-        private GridSelectorNioSessionImpl session() {
+        GridSelectorNioSessionImpl session() {
             return ses;
         }
 
@@ -2288,6 +2511,41 @@ public class GridNioServer<T> {
     }
 
     /**
+     *
+     */
+    private static class SessionMoveFuture<R> extends NioOperationFuture<R> {
+        /** */
+        private final int toIdx;
+
+        /**
+         * @param ses
+         * @param toIdx
+         */
+        public SessionMoveFuture(
+            GridSelectorNioSessionImpl ses,
+            int toIdx
+        ) {
+            super(ses, NioOperation.MOVE);
+
+            this.sockCh = sockCh;
+            this.toIdx = toIdx;
+        }
+
+        int toIndex() {
+            return toIdx;
+        }
+
+        void socketChannel(SocketChannel sockCh) {
+            this.sockCh = sockCh;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SessionMoveFuture.class, this, super.toString());
+        }
+    }
+
+    /**
      * Filter forwarding messages from chain's head to this server.
      */
     private class HeadFilter extends GridNioFilterAdapter {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9113dd8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 99a61da..5d023ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -45,15 +45,21 @@ public class GridNioSessionImpl implements GridNioSession {
     /** Session close timestamp. */
     private final AtomicLong closeTime = new AtomicLong();
 
-    /** Sent bytes counter. */
-    private volatile long bytesSent;
-
     private final long[] writesStat = new long[25];
     private final long[] readsStat = new long[25];
 
+    /** Sent bytes counter. */
+    private volatile long bytesSent;
+
     /** Received bytes counter. */
     private volatile long bytesRcvd;
 
+    /** Sent bytes counter since last re-balancing. */
+    private volatile long bytesSent0;
+
+    /** Received bytes counter since last re-balancing. */
+    private volatile long bytesRcvd0;
+
     /** Last send schedule timestamp. */
     private volatile long sndSchedTime;
 
@@ -166,6 +172,19 @@ public class GridNioSessionImpl implements GridNioSession {
         return bytesRcvd;
     }
 
+    public long bytesSent0() {
+        return bytesSent0;
+    }
+
+    public long bytesReceived0() {
+        return bytesRcvd0;
+    }
+
+    public void reset0() {
+        bytesSent0 = 0;
+        bytesRcvd0 = 0;
+    }
+
     /** {@inheritDoc} */
     @Override public long createTime() {
         return createTime;
@@ -243,6 +262,7 @@ public class GridNioSessionImpl implements GridNioSession {
      */
     public void bytesSent(int cnt) {
         bytesSent += cnt;
+        bytesSent0 += cnt;
 
         lastSndTime = U.currentTimeMillis();
     }
@@ -282,6 +302,7 @@ public class GridNioSessionImpl implements GridNioSession {
      */
     public void bytesReceived(int cnt) {
         bytesRcvd += cnt;
+        bytesRcvd0 += cnt;
 
         lastRcvTime = U.currentTimeMillis();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9113dd8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 487b11a..d989ed6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -44,7 +44,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     private SelectionKey key;
 
     /** Worker index for server */
-    private final int selectorIdx;
+    private volatile int selectorIdx;
 
     /** Size counter. */
     private final AtomicInteger queueSize = new AtomicInteger();
@@ -124,7 +124,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param key Selection key.
      */
     void key(SelectionKey key) {
-        assert this.key == null;
+        assert key != null;
 
         this.key = key;
     }
@@ -158,6 +158,13 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /**
+     * @param selectorIdx Selector index.
+     */
+    void selectorIndex(int selectorIdx) {
+        this.selectorIdx = selectorIdx;
+    }
+
+    /**
      * Adds write future at the front of the queue without acquiring back pressure semaphore.
      *
      * @param writeFut Write request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9113dd8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index d12c6bf..3292412 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -283,7 +283,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * Default count of selectors for TCP server equals to
      * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
      */
-    public static final int DFLT_SELECTORS_CNT = Runtime.getRuntime().availableProcessors();
+    public static final int DFLT_SELECTORS_CNT = Math.min(8, Runtime.getRuntime().availableProcessors());
 
     /** Node ID meta for session. */
     private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();


Mime
View raw message