tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r543538 - in /tomcat/trunk/java/org/apache/tomcat/util/net: NioBlockingSelector.java NioEndpoint.java NioSelectorPool.java
Date Fri, 01 Jun 2007 17:20:07 GMT
Author: fhanik
Date: Fri Jun  1 10:20:06 2007
New Revision: 543538

URL: http://svn.apache.org/viewvc?view=rev&rev=543538
Log:
Thread safe handling of dealing with async writes and non blocking writes, needed to separate
it into a poller for incoming events and one poller for outgoing data
Not thread safe for multiple async servlet threads writing at the same time, up to the comet
developer to set it straight

Modified:
    tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=543538&r1=543537&r2=543538
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Fri Jun  1 10:20:06
2007
@@ -25,9 +25,43 @@
 
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
 import org.apache.tomcat.util.MutableInteger;
+import java.nio.channels.Selector;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import java.util.Iterator;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CancelledKeyException;
+import java.util.concurrent.CountDownLatch;
 
 public class NioBlockingSelector {
+    
+    protected static Log log = LogFactory.getLog(NioBlockingSelector.class);
+    
+    private static int threadCounter = 0;
+    
+    protected Selector sharedSelector;
+    
+    protected BlockPoller poller;
     public NioBlockingSelector() {
+        
+    }
+    
+    public void open(Selector selector) {
+        sharedSelector = selector;
+        poller = new BlockPoller();
+        poller.selector = sharedSelector;
+        poller.setDaemon(true);
+        poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter));
+        poller.start();
+    }
+    
+    public void close() {
+        if (poller!=null) {
+            poller.disable();
+            poller.interrupt();
+        }
     }
 
     /**
@@ -42,11 +76,10 @@
      * @throws SocketTimeoutException if the write times out
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
-    public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger
lastWrite) throws IOException {
+    public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger
lastWrite) throws IOException {
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if ( key == null ) throw new IOException("Key no longer registered");
         KeyAttachment att = (KeyAttachment) key.attachment();
-        int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK);
         int written = 0;
         boolean timedout = false;
         int keycount = 1; //assume we can write
@@ -66,8 +99,7 @@
                 }
                 try {
                     if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0)
att.startWriteLatch(1);
-                    //only register for write if a write has not yet been issued
-                    if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_WRITE);
+                    poller.add(att,SelectionKey.OP_WRITE);
                     att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
@@ -87,11 +119,11 @@
             if (timedout) 
                 throw new SocketTimeoutException();
         } finally {
+            poller.remove(att,SelectionKey.OP_WRITE);
             if (timedout && key != null) {
                 cancelKey(socket, key);
             }
         }
-        socket.getPoller().add(socket,prevOps);
         return written;
     }
 
@@ -108,11 +140,10 @@
      * @throws SocketTimeoutException if the read times out
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
-    public static int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException
{
+    public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException
{
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if ( key == null ) throw new IOException("Key no longer registered");
         KeyAttachment att = (KeyAttachment) key.attachment();
-        int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK);
         int read = 0;
         boolean timedout = false;
         int keycount = 1; //assume we can write
@@ -129,7 +160,7 @@
                 }
                 try {
                     if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
-                    if ( (att.interestOps() & SelectionKey.OP_READ) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_READ);
+                    poller.add(att,SelectionKey.OP_READ);
                     att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
@@ -148,11 +179,11 @@
             if (timedout)
                 throw new SocketTimeoutException();
         } finally {
+            poller.remove(att,SelectionKey.OP_READ);
             if (timedout && key != null) {
                 cancelKey(socket,key);
             }
         }
-        socket.getPoller().add(socket,prevOps);
         return read;
     }
 
@@ -163,6 +194,137 @@
                 socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
             }
         });
+    }
+    
+    protected class BlockPoller extends Thread {
+        protected boolean run = true;
+        protected Selector selector = null;
+        protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+        public void disable() { run = false; selector.wakeup();}
+        
+        public void add(final KeyAttachment key, final int ops) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    SocketChannel ch = key.getChannel().getIOChannel();
+                    SelectionKey sk = ch.keyFor(selector);
+                    try {
+                        if (sk == null) {
+                            sk = ch.register(selector, ops, key);
+                        } else {
+                            sk.interestOps(sk.interestOps() | ops);
+                        }
+                    }catch (ClosedChannelException cx) {
+                        if (sk!=null) sk.cancel();
+                    }
+                }
+            };
+            events.offer(r);
+        }
+        
+        public void remove(final KeyAttachment key, final int ops) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    SocketChannel ch = key.getChannel().getIOChannel();
+                    SelectionKey sk = ch.keyFor(selector);
+                    try {
+                        if (sk == null) {
+                            if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+                            if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                        } else {
+                            sk.interestOps(sk.interestOps() & (~ops));
+                            if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+                            if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                            if (sk.interestOps()==0) {
+                                sk.cancel();
+                                sk.attach(null);
+                            }
+                        }
+                    }catch (CancelledKeyException cx) {
+                        if (sk!=null) {
+                            sk.cancel();
+                            sk.attach(null);
+                        }
+                    }
+                }
+            };
+            events.offer(r);
+            selector.wakeup();
+        }
+
+
+        public boolean events() {
+            boolean result = false;
+            Runnable r = null;
+            result = (events.size() > 0);
+            while ( (r = (Runnable)events.poll()) != null ) {
+                r.run();
+                result = true;
+            }
+            return result;
+        }
+
+        public void run() {
+            while (run) {
+                try {
+                    events();
+                    int keyCount = 0;
+                    try {
+                        keyCount = selector.select(1000);
+                        if (!run) break;
+                    }catch ( NullPointerException x ) {
+                        //sun bug 5076772 on windows JDK 1.5
+                        if (selector==null) throw x;
+                        continue;
+                    } catch ( CancelledKeyException x ) {
+                        //sun bug 5076772 on windows JDK 1.5
+                        continue;
+                    } catch (Throwable x) {
+                        log.error("",x);
+                        continue;
+                    }
+
+                    Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator()
: null;
+
+                    // Walk through the collection of ready keys and dispatch
+                    // any active event.
+                    while (run && iterator != null && iterator.hasNext())
{
+                        SelectionKey sk = (SelectionKey) iterator.next();
+                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
+                        try {
+                            attachment.access();
+                            iterator.remove(); ;
+                            sk.interestOps(sk.interestOps() & (~sk.readyOps()));
+                            if ( sk.isReadable() ) {
+                                countDown(attachment.getReadLatch());
+                            }
+                            if (sk.isWritable()) {
+                                countDown(attachment.getWriteLatch());
+                            }
+                        }catch (CancelledKeyException ckx) {
+                            if (sk!=null) sk.cancel();
+                            countDown(attachment.getReadLatch());
+                            countDown(attachment.getWriteLatch());
+                        }
+                    }//while
+                }catch ( Throwable t ) {
+                    log.error("",t);
+                }
+            }
+            events.clear();
+            try {
+                selector.selectNow();//cancel all remaining keys
+            }catch( Exception ignore ) {
+                if (log.isDebugEnabled())log.debug("",ignore);
+            }
+        }
+        
+        public void countDown(CountDownLatch latch) {
+            if ( latch == null ) return;
+            latch.countDown();
+        }
+        
+        
+        
     }
 
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=543538&r1=543537&r2=543538
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Fri Jun  1 10:20:06 2007
@@ -737,7 +737,7 @@
         }
         
         if (oomParachute>0) reclaimParachute(true);
-
+        selectorPool.open();
         initialized = true;
 
     }
@@ -832,6 +832,7 @@
             }
             executor = null;
         }
+        
     }
 
 
@@ -849,6 +850,7 @@
         sslContext = null;
         initialized = false;
         releaseCaches();
+        selectorPool.close();
     }
 
 
@@ -1473,13 +1475,7 @@
                     sk.attach(attachment);//cant remember why this is here
                     NioChannel channel = attachment.getChannel();
                     if (sk.isReadable() || sk.isWritable() ) {
-                        if ( sk.isReadable() && attachment.getReadLatch() != null
) {
-                            unreg(sk, attachment,SelectionKey.OP_READ);
-                            attachment.getReadLatch().countDown();
-                        } else if ( sk.isWritable() && attachment.getWriteLatch()
!= null ) {
-                            unreg(sk, attachment,SelectionKey.OP_WRITE);
-                            attachment.getWriteLatch().countDown();
-                        } else if ( attachment.getSendfileData() != null ) {
+                        if ( attachment.getSendfileData() != null ) {
                             processSendfile(sk,attachment,true);
                         } else if ( attachment.getComet() ) {
                             //check if thread is available
@@ -1674,7 +1670,7 @@
         public CountDownLatch getReadLatch() { return readLatch; }
         public CountDownLatch getWriteLatch() { return writeLatch; }
         protected CountDownLatch resetLatch(CountDownLatch latch) {
-            if ( latch.getCount() == 0 ) return null;
+            if ( latch==null || latch.getCount() == 0 ) return null;
             else throw new IllegalStateException("Latch must be at count 0");
         }
         public void resetReadLatch() { readLatch = resetLatch(readLatch); }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?view=diff&rev=543538&r1=543537&r2=543538
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java Fri Jun  1 10:20:06
2007
@@ -39,12 +39,19 @@
  */
 
 public class NioSelectorPool {
+    
+    public NioSelectorPool() {
+    }
+    
     protected static int threadCount = 0;
     
     protected static Log log = LogFactory.getLog(NioSelectorPool.class);
 
     protected final static boolean SHARED =
         Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared",
"true")).booleanValue();
+    
+    protected NioBlockingSelector blockingSelector;
+    
     protected Selector SHARED_SELECTOR;
     
     protected int maxSelectors = 200;
@@ -107,6 +114,9 @@
         while ( (s = selectors.poll()) != null ) s.close();
         spare.set(0);
         active.set(0);
+        if (blockingSelector!=null) {
+            blockingSelector.close();
+        }
         if ( SHARED && getSharedSelector()!=null ) {
             getSharedSelector().close();
             SHARED_SELECTOR = null;
@@ -116,6 +126,11 @@
     public void open() throws IOException {
         enabled = true;
         getSharedSelector();
+        if (SHARED) {
+            blockingSelector = new NioBlockingSelector();
+            blockingSelector.open(getSharedSelector());
+        }
+
     }
 
     /**
@@ -142,7 +157,7 @@
             buf = socket.getBufHandler().getWriteBuffer();
         }
         if ( SHARED && block ) {
-            return NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite);
+            return blockingSelector.write(buf,socket,writeTimeout,lastWrite);
         }
         SelectionKey key = null;
         int written = 0;
@@ -215,7 +230,7 @@
      */
     public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout,
boolean block) throws IOException {
         if ( SHARED && block ) {
-            return NioBlockingSelector.read(buf,socket,readTimeout);
+            return blockingSelector.read(buf,socket,readTimeout);
         }
         SelectionKey key = null;
         int read = 0;



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message