ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [50/50] [abbrv] ignite git commit: io
Date Mon, 10 Oct 2016 14:57:55 GMT
io


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d37469c1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d37469c1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d37469c1

Branch: refs/heads/ignite-gg-8-io2-park
Commit: d37469c1cc103f397a20f04e36b8e031641adc4a
Parents: 67c4af5
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 10 15:08:28 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 10 17:50:54 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 97 +++++++++++++++-----
 1 file changed, 75 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d37469c1/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 8006a2c..6aea84f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -45,6 +45,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -406,7 +407,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        clientWorkers.get(impl.selectorIndex()).offer(fut, impl.selectorIdx);
 
         return fut;
     }
@@ -497,9 +498,8 @@ public class GridNioServer<T> {
             }
         }
         else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true))
{
-            ses.worker.offer((SessionChangeRequest) fut);
-
-            ses.wakeupCnt.increment();
+            if (ses.worker.offer((SessionChangeRequest)fut, ses.selectorIdx))
+                ses.wakeupCnt.increment();
         }
 
         if (msgQueueLsnr != null)
@@ -574,7 +574,7 @@ public class GridNioServer<T> {
             ses0.resend(futs);
 
             // Wake up worker.
-            clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0));
+            clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0), ses0.selectorIdx);
         }
     }
 
@@ -602,7 +602,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        clientWorkers.get(impl.selectorIndex()).offer(fut, impl.selectorIdx);
 
         return fut;
     }
@@ -612,7 +612,7 @@ public class GridNioServer<T> {
      */
     public void dumpStats() {
         for (int i = 0; i < clientWorkers.size(); i++)
-            clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS));
+            clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS),
i);
     }
 
     /**
@@ -759,7 +759,7 @@ public class GridNioServer<T> {
         else
             balanceIdx = 0;
 
-        clientWorkers.get(balanceIdx).offer(req);
+        clientWorkers.get(balanceIdx).offer(req, balanceIdx);
     }
 
     /** {@inheritDoc} */
@@ -1300,8 +1300,15 @@ public class GridNioServer<T> {
 
                             assert set;
 
-                            if (ses.writeQueue().isEmpty())
-                                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                            if (ses.writeQueue().isEmpty()) {
+                                if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
{
+                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+                                    writeSesCnt--;
+
+                                    assert writeSesCnt >= 0 : writeSesCnt;
+                                }
+                            }
                             else
                                 ses.procWrite.set(true);
                         }
@@ -1410,7 +1417,17 @@ public class GridNioServer<T> {
         /** Worker index. */
         private final int idx;
 
-        public final boolean wakeup;
+        /** */
+        private final boolean writer;
+
+        /** */
+        volatile boolean select;
+
+        /** */
+        int writeSesCnt;
+
+        /** */
+        volatile boolean park;
 
         /**
          * @param idx Index of this worker in server's array.
@@ -1427,7 +1444,7 @@ public class GridNioServer<T> {
 
             this.idx = idx;
 
-            wakeup = idx % 2 == 0;
+            writer = idx % 2 == 1;
         }
 
         /** {@inheritDoc} */
@@ -1511,10 +1528,18 @@ public class GridNioServer<T> {
          *
          * @param req Change request.
          */
-        private void offer(SessionChangeRequest req) {
+        private boolean offer(SessionChangeRequest req, int idx) {
             changeReqs.offer(req);
 
-            selector.wakeup();
+            if (select) {
+                selector.wakeup();
+
+                return true;
+            }
+            else if (park)
+                LockSupport.unpark(clientThreads[idx]);
+
+            return false;
         }
 
         /**
@@ -1611,13 +1636,38 @@ public class GridNioServer<T> {
                         }
                     }
 
-                    // Wake up every 2 seconds to check if closed.
-                    if (selector.select(2000) > 0) {
-                        // Walk through the ready keys collection and process network events.
-                        if (selectedKeys == null)
-                            processSelectedKeys(selector.selectedKeys());
-                        else
-                            processSelectedKeysOptimized(selectedKeys.flip());
+                    if (!writer || writeSesCnt > 0) {
+                        select = true;
+
+                        try {
+                            // Wake up every 2 seconds to check if closed.
+                            if (changeReqs.isEmpty() && selector.select(2000) >
0) {
+                                // Walk through the ready keys collection and process network
events.
+                                if (selectedKeys == null)
+                                    processSelectedKeys(selector.selectedKeys());
+                                else
+                                    processSelectedKeysOptimized(selectedKeys.flip());
+                            }
+                        }
+                        finally {
+                            select = false;
+                        }
+                    }
+                    else {
+                        park = true;
+
+                        try {
+                            long end = System.currentTimeMillis() + 2000;
+
+                            if (changeReqs.isEmpty())
+                                LockSupport.parkUntil(end);
+
+                            if (!selector.isOpen() || Thread.interrupted())
+                                return;
+                        }
+                        finally {
+                            park = false;
+                        }
                     }
 
                     long now = U.currentTimeMillis();
@@ -1664,9 +1714,12 @@ public class GridNioServer<T> {
             SelectionKey key = ses.key();
 
             if (key.isValid()) {
-                if ((key.interestOps() & SelectionKey.OP_WRITE) == 0)
+                if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) {
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
 
+                    writeSesCnt++;
+                }
+
                 // Update timestamp to protected against false write timeout.
                 ses.bytesSent(0);
             }


Mime
View raw message