tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r719264 - /tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
Date Thu, 20 Nov 2008 16:13:02 GMT
Author: fhanik
Date: Thu Nov 20 08:13:02 2008
New Revision: 719264

URL: http://svn.apache.org/viewvc?rev=719264&view=rev
Log:
Fixed read/write timeouts - backport of http://svn.apache.org/viewvc?view=rev&revision=707670

Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?rev=719264&r1=719263&r2=719264&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Thu Nov
20 08:13:02 2008
@@ -81,6 +81,7 @@
     public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger
lastWrite) throws IOException {
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if ( key == null ) throw new IOException("Key no longer registered");
+        KeyReference reference = new KeyReference();
         KeyAttachment att = (KeyAttachment) key.attachment();
         int written = 0;
         boolean timedout = false;
@@ -101,7 +102,7 @@
                 }
                 try {
                     if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0)
att.startWriteLatch(1);
-                    poller.add(att,SelectionKey.OP_WRITE);
+                    poller.add(att,SelectionKey.OP_WRITE,reference);
                     att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
@@ -122,9 +123,10 @@
                 throw new SocketTimeoutException();
         } finally {
             poller.remove(att,SelectionKey.OP_WRITE);
-            if (timedout && key != null) {
-                poller.cancelKey(socket, key);
+            if (timedout && reference.key!=null) {
+                poller.cancelKey(reference.key);
             }
+            reference.key = null;
         }
         return written;
     }
@@ -145,6 +147,7 @@
     public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException
{
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if ( key == null ) throw new IOException("Key no longer registered");
+        KeyReference reference = new KeyReference();
         KeyAttachment att = (KeyAttachment) key.attachment();
         int read = 0;
         boolean timedout = false;
@@ -162,7 +165,7 @@
                 }
                 try {
                     if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
-                    poller.add(att,SelectionKey.OP_READ);
+                    poller.add(att,SelectionKey.OP_READ, reference);
                     att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
@@ -182,9 +185,10 @@
                 throw new SocketTimeoutException();
         } finally {
             poller.remove(att,SelectionKey.OP_READ);
-            if (timedout && key != null) {
-                poller.cancelKey(socket,key);
+            if (timedout && reference.key!=null) {
+                poller.cancelKey(reference.key);
             }
+            reference.key = null;
         }
         return read;
     }
@@ -193,10 +197,10 @@
     protected class BlockPoller extends Thread {
         protected boolean run = true;
         protected Selector selector = null;
-        protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+        protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
         public void disable() { run = false; selector.wakeup();}
         protected AtomicInteger wakeupCounter = new AtomicInteger(0);
-        public void cancelKey(final NioChannel socket, final SelectionKey key) {
+        public void cancelKey(final SelectionKey key) {
             Runnable r = new Runnable() {
                 public void run() {
                     key.cancel();
@@ -219,7 +223,7 @@
             }
         }
         
-        public void add(final KeyAttachment key, final int ops) {
+        public void add(final KeyAttachment key, final int ops, final KeyReference ref) {
             Runnable r = new Runnable() {
                 public void run() {
                     if ( key == null ) return;
@@ -231,6 +235,9 @@
                     try {
                         if (sk == null) {
                             sk = ch.register(selector, ops, key);
+                            ref.key = sk;
+                        } else if (!sk.isValid()) {
+                            cancel(sk,key,ops);
                         } else {
                             sk.interestOps(sk.interestOps() | ops);
                         }
@@ -259,10 +266,15 @@
                             if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
                             if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
                         } else {
-                            sk.interestOps(sk.interestOps() & (~ops));
-                            if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
-                            if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
-                            if (sk.interestOps()==0) {
+                            if (sk.isValid()) {
+                            	sk.interestOps(sk.interestOps() & (~ops));
+                            	if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE))
countDown(key.getWriteLatch());
+                            	if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                            	if (sk.interestOps()==0) {
+                            	    sk.cancel();
+                            	    sk.attach(null);
+                            	}
+                            }else {
                                 sk.cancel();
                                 sk.attach(null);
                             }
@@ -284,7 +296,7 @@
             boolean result = false;
             Runnable r = null;
             result = (events.size() > 0);
-            while ( (r = (Runnable)events.poll()) != null ) {
+            while ( (r = events.poll()) != null ) {
                 r.run();
                 result = true;
             }
@@ -320,12 +332,12 @@
                         continue;
                     }
 
-                    Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator()
: null;
+                    Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator()
: null;
 
                     // Walk through the collection of ready keys and dispatch
                     // any active event.
                     while (run && iterator != null && iterator.hasNext())
{
-                        SelectionKey sk = (SelectionKey) iterator.next();
+                        SelectionKey sk = iterator.next();
                         KeyAttachment attachment = (KeyAttachment)sk.attachment();
                         try {
                             attachment.access();
@@ -353,15 +365,30 @@
             }catch( Exception ignore ) {
                 if (log.isDebugEnabled())log.debug("",ignore);
             }
+            try {
+                selector.close();//Close the connector
+            }catch( Exception ignore ) {
+                if (log.isDebugEnabled())log.debug("",ignore);
+            }
         }
         
         public void countDown(CountDownLatch latch) {
             if ( latch == null ) return;
             latch.countDown();
         }
+    }
+    
+    public class KeyReference {
+        SelectionKey key = null;
         
-        
-        
+        @Override
+        public void finalize() {
+            if (key!=null && key.isValid()) {
+                log.warn("Possible key leak, cancelling key in the finalizer.");
+                try {key.cancel();}catch (Exception ignore){}
+            }
+            key = null;
+        }
     }
 
 }
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message