tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r596486 - in /tomcat/tc6.0.x/trunk: ./ java/org/apache/coyote/http11/ java/org/apache/tomcat/util/ java/org/apache/tomcat/util/net/ webapps/docs/
Date Mon, 19 Nov 2007 23:28:26 GMT
Author: fhanik
Date: Mon Nov 19 15:28:21 2007
New Revision: 596486

URL: http://svn.apache.org/viewvc?rev=596486&view=rev
Log:
Fix bug

Added:
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java
Modified:
    tomcat/tc6.0.x/trunk/STATUS.txt
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
    tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml

Modified: tomcat/tc6.0.x/trunk/STATUS.txt
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/STATUS.txt?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/STATUS.txt (original)
+++ tomcat/tc6.0.x/trunk/STATUS.txt Mon Nov 19 15:28:21 2007
@@ -35,15 +35,6 @@
   http://people.apache.org/~jfclere/patches/test_cookies.patch
   +1: jfclere
   -1: fhanik - Can we add the 'package' directive to make the package match the dir structure
-
-* Fix BZ 43846
-  Fix output of data on simulated blocking IO
-  Improve speed of writing and reading
-  Add in non blocking request header parsing
-  This is a port of improvements from the old trunk and fixes the BZ above and improves greatly on the NIO connector
-  http://people.apache.org/~fhanik/patches/fix-nio-blocking-output.patch
-  +1: fhanik, jim, pero
-  -1: 
   
 * Fix Comet bug, if servlet calls cometEvent.close() on the BEGIN event, the request still is marked as comet and ends up in a funky state
   http://people.apache.org/~fhanik/patches/comet-begin-event-close.patch

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Mon Nov 19 15:28:21 2007
@@ -90,12 +90,12 @@
 
         request = new Request();
         int readTimeout = endpoint.getSoTimeout();
-        inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize,readTimeout);
+        inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize);
         request.setInputBuffer(inputBuffer);
 
         response = new Response();
         response.setHook(this);
-        outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize,readTimeout);
+        outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize);
         response.setOutputBuffer(outputBuffer);
         request.setResponse(response);
 
@@ -751,10 +751,7 @@
                 NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
                 if (attach != null) {
                     attach.setComet(comet);
-                    if (comet) {
-                        Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
-                        if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
-                    } else {
+                    if (!comet) {
                         //reset the timeout
                         attach.setTimeout(endpoint.getSocketProperties().getSoTimeout());
                     }
@@ -794,14 +791,6 @@
         RequestInfo rp = request.getRequestProcessor();
         rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
 
-        // Set the remote address
-        remoteAddr = null;
-        remoteHost = null;
-        localAddr = null;
-        localName = null;
-        remotePort = -1;
-        localPort = -1;
-
         // Setting up the socket
         this.socket = socket;
         inputBuffer.setSocket(socket);
@@ -829,30 +818,30 @@
             try {
                 if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
                     socket.getIOChannel().socket().setSoTimeout((int)soTimeout);
-                    inputBuffer.readTimeout = soTimeout;
                 }
-                if (!inputBuffer.parseRequestLine(keptAlive && (endpoint.getCurrentThreadsBusy() >= limit))) {
-                    // This means that no data is available right now
-                    // (long keepalive), so that the processor should be recycled
-                    // and the method should return true
+                if (!inputBuffer.parseRequestLine(keptAlive)) {
+                    //no data available yet, since we might have read part
+                    //of the request line, we can't recycle the processor
                     openSocket = true;
-                    // Add the socket to the poller
-                    socket.getPoller().add(socket);
+                    recycle = false;
                     break;
                 }
                 keptAlive = true;
                 if ( !inputBuffer.parseHeaders() ) {
+                    //we've read part of the request, don't recycle it
+                    //instead associate it with the socket
                     openSocket = true;
-                    socket.getPoller().add(socket);
                     recycle = false;
                     break;
                 }
                 request.setStartTime(System.currentTimeMillis());
                 if (!disableUploadTimeout) { //only for body, not for request headers
                     socket.getIOChannel().socket().setSoTimeout((int)timeout);
-                    inputBuffer.readTimeout = soTimeout;
                 }
             } catch (IOException e) {
+                if (log.isDebugEnabled()) {
+                    log.debug(sm.getString("http11processor.header.parse"), e);
+                }
                 error = true;
                 break;
             } catch (Throwable t) {
@@ -900,10 +889,6 @@
                         NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                         if (attach != null)  {
                             attach.setComet(comet);
-                            if (comet) {
-                                Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
-                                if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
-                            }
                         }
                     }
                 } catch (InterruptedIOException e) {
@@ -961,7 +946,8 @@
             }
         } else {
             if ( recycle ) recycle();
-            return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
+            //return (openSocket) ? (SocketState.OPEN) : SocketState.CLOSED;
+            return (openSocket) ? (recycle?SocketState.OPEN:SocketState.LONG) : SocketState.CLOSED;
         }
 
     }
@@ -998,6 +984,12 @@
         this.socket = null;
         this.cometClose = false;
         this.comet = false;
+        remoteAddr = null;
+        remoteHost = null;
+        localAddr = null;
+        localName = null;
+        remotePort = -1;
+        localPort = -1;
     }
 
 
@@ -1070,7 +1062,7 @@
                 if ( attach!=null && attach.getComet()) {
                     //if this is a comet connection
                     //then execute the connection closure at the next selector loop
-                    request.getAttributes().remove("org.apache.tomcat.comet.timeout");
+                    //request.getAttributes().remove("org.apache.tomcat.comet.timeout");
                     //attach.setTimeout(5000); //force a cleanup in 5 seconds
                     //attach.setError(true); //this has caused concurrency errors
                 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java Mon Nov 19 15:28:21 2007
@@ -95,12 +95,13 @@
      * Set a property.
      */
     public boolean setProperty(String name, String value) {
-        setAttribute(name, value);
+        setAttribute(name, value); //store all settings
         if ( name!=null && (name.startsWith("socket.") ||name.startsWith("selectorPool.")) ){
             return ep.setProperty(name, value);
         } else {
             return ep.setProperty(name,value); //make sure we at least try to set all properties
         }
+        
     }
 
     /**
@@ -632,6 +633,14 @@
         public void releaseCaches() {
             recycledProcessors.clear();
         }
+        
+        public void release(NioChannel socket) {
+            Http11NioProcessor result = connections.remove(socket);
+            if ( result != null ) {
+                result.recycle();
+                recycledProcessors.offer(result);
+            }
+        }
 
         public SocketState event(NioChannel socket, SocketStatus status) {
             Http11NioProcessor result = connections.get(socket);
@@ -671,7 +680,9 @@
                         }
                     } else {
                         if (log.isDebugEnabled()) log.debug("Keeping processor["+result);
-                        socket.getPoller().add(socket);
+                        //add correct poller events here based on Comet stuff
+                        NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                        socket.getPoller().add(socket,att.getCometOps());
                     }
                 }
             }
@@ -681,6 +692,8 @@
         public SocketState process(NioChannel socket) {
             Http11NioProcessor processor = null;
             try {
+                processor = connections.remove(socket);
+                
                 if (processor == null) {
                     processor = recycledProcessors.poll();
                 }
@@ -708,9 +721,14 @@
                     // Associate the connection with the processor. The next request 
                     // processed by this thread will use either a new or a recycled
                     // processor.
-                    if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet());
+                    //if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet());
                     connections.put(socket, processor);
-                    socket.getPoller().add(socket);
+                    if (processor.comet) {
+                        NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                        socket.getPoller().add(socket,att.getCometOps());
+                    } else {
+                        socket.getPoller().add(socket);
+                    }
                 } else {
                     recycledProcessors.offer(processor);
                 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Mon Nov 19 15:28:21 2007
@@ -30,6 +30,7 @@
 import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.net.NioEndpoint;
 
 /**
  * Implementation of InputBuffer which provides HTTP request header parsing as
@@ -51,8 +52,7 @@
     /**
      * Alternate constructor.
      */
-    public InternalNioInputBuffer(Request request, int headerBufferSize, 
-                                  long readTimeout) {
+    public InternalNioInputBuffer(Request request, int headerBufferSize) {
 
         this.request = request;
         headers = request.getMimeHeaders();
@@ -72,16 +72,14 @@
 
         parsingHeader = true;
         parsingRequestLine = true;
+        parsingRequestLinePhase = 0;
+        parsingRequestLineEol = false;
+        parsingRequestLineStart = 0;
+        parsingRequestLineQPos = -1;
         headerParsePos = HeaderParsePosition.HEADER_START;
         headerData.recycle();
         swallowInput = true;
 
-        if (readTimeout < 0) {
-            this.readTimeout = -1;
-        } else {
-            this.readTimeout = readTimeout;
-        }
-
     }
 
 
@@ -111,10 +109,15 @@
 
 
     /**
-     * State.
+     * Parsing state - used for non blocking parsing so that
+     * when more data arrives, we can pick up where we left off.
      */
     protected boolean parsingHeader;
     protected boolean parsingRequestLine;
+    protected int parsingRequestLinePhase = 0;
+    protected boolean parsingRequestLineEol = false;
+    protected int parsingRequestLineStart = 0;
+    protected int parsingRequestLineQPos = -1;
     protected HeaderParsePosition headerParsePos;
 
 
@@ -186,12 +189,6 @@
     protected int lastActiveFilter;
 
 
-    /**
-     * The socket timeout used when reading the first block of the request
-     * header.
-     */
-    protected long readTimeout;
-
     // ------------------------------------------------------------- Properties
 
 
@@ -287,7 +284,23 @@
     }
 
     // --------------------------------------------------------- Public Methods
-
+    /**
+     * Returns true if there are bytes available from the socket layer
+     * @return boolean
+     * @throws IOException
+     */
+    public boolean isReadable() throws IOException {
+        return (pos < lastValid) || (nbRead()>0);
+    }
+    
+    /**
+     * Issues a non blocking read
+     * @return int
+     * @throws IOException
+     */
+    public int nbRead() throws IOException {
+        return readSocket(true,false);
+    }
 
     /**
      * Recycle the input buffer. This should be called when closing the 
@@ -309,6 +322,10 @@
         parsingHeader = true;
         headerParsePos = HeaderParsePosition.HEADER_START;
         parsingRequestLine = true;
+        parsingRequestLinePhase = 0;
+        parsingRequestLineEol = false;
+        parsingRequestLineStart = 0;
+        parsingRequestLineQPos = -1;
         headerData.recycle();
         swallowInput = true;
 
@@ -350,6 +367,10 @@
         parsingHeader = true;
         headerParsePos = HeaderParsePosition.HEADER_START;
         parsingRequestLine = true;
+        parsingRequestLinePhase = 0;
+        parsingRequestLineEol = false;
+        parsingRequestLineStart = 0;
+        parsingRequestLineQPos = -1;
         headerData.recycle();
         swallowInput = true;
 
@@ -388,160 +409,137 @@
 
         //check state
         if ( !parsingRequestLine ) return true;
-        
-        int start = 0;
-
         //
         // Skipping blank lines
         //
-
-        byte chr = 0;
-        do {
-
-            // Read new bytes if needed
+        if ( parsingRequestLinePhase == 0 ) {
+            byte chr = 0;
+            do {
+                
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (useAvailableData) {
+                        return false;
+                    }
+                    // Do a simple read with a short timeout
+                    if ( readSocket(true, false)==0 ) return false;
+                }
+                chr = buf[pos++];
+            } while ((chr == Constants.CR) || (chr == Constants.LF));
+            pos--;
+            parsingRequestLineStart = pos;
+            parsingRequestLinePhase = 1;
+        } 
+        if ( parsingRequestLinePhase == 1 ) {
+            // Mark the current buffer position
+            
             if (pos >= lastValid) {
                 if (useAvailableData) {
                     return false;
                 }
-                if (readTimeout == -1) {
-                    if (!fill(false,true)) //request line parsing
-                        throw new EOFException(sm.getString("iib.eof.error"));
-                } else {
-                    // Do a simple read with a short timeout
-                    if ( !readSocket(true, false) ) return false;
-                }
-            }
-
-            chr = buf[pos++];
-
-        } while ((chr == Constants.CR) || (chr == Constants.LF));
-
-        pos--;
-
-        // Mark the current buffer position
-        start = pos;
-
-        if (pos >= lastValid) {
-            if (useAvailableData) {
-                return false;
-            }
-            if (readTimeout == -1) {
-                if (!fill(false,true)) //request line parsing
-                    return false;
-            } else {
                 // Do a simple read with a short timeout
-                if ( !readSocket(true, true) ) return false;
+                if ( readSocket(true, false)==0 ) return false;
             }
+            parsingRequestLinePhase = 2;
         }
-
-        //
-        // Reading the method name
-        // Method name is always US-ASCII
-        //
-
-        boolean space = false;
-
-        while (!space) {
-
-            // Read new bytes if needed
-            if (pos >= lastValid) {
-                if (!fill(true,true)) //request line parsing
-                    return false;
-            }
-
-            if (buf[pos] == Constants.SP) {
-                space = true;
-                request.method().setBytes(buf, start, pos - start);
+        if ( parsingRequestLinePhase == 2 ) {
+            //
+            // Reading the method name
+            // Method name is always US-ASCII
+            //
+            boolean space = false;
+            while (!space) {
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (!fill(true, false)) //request line parsing
+                        return false;
+                }
+                if (buf[pos] == Constants.SP) {
+                    space = true;
+                    request.method().setBytes(buf, parsingRequestLineStart, pos - parsingRequestLineStart);
+                }
+                pos++;
             }
-
-            pos++;
-
+            parsingRequestLineStart = pos;
+            parsingRequestLinePhase = 3;
         }
-
-        // Mark the current buffer position
-        start = pos;
-        int end = 0;
-        int questionPos = -1;
-
-        //
-        // Reading the URI
-        //
-
-        space = false;
-        boolean eol = false;
-
-        while (!space) {
-
-            // Read new bytes if needed
-            if (pos >= lastValid) {
-                if (!fill(true,true)) //request line parsing
-                    return false;
+        if ( parsingRequestLinePhase == 3 ) {
+            // Mark the current buffer position
+            
+            int end = 0;
+            //
+            // Reading the URI
+            //
+            boolean space = false;
+            while (!space) {
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (!fill(true,false)) //request line parsing
+                        return false;
+                }
+                if (buf[pos] == Constants.SP) {
+                    space = true;
+                    end = pos;
+                } else if ((buf[pos] == Constants.CR) 
+                           || (buf[pos] == Constants.LF)) {
+                    // HTTP/0.9 style request
+                    parsingRequestLineEol = true;
+                    space = true;
+                    end = pos;
+                } else if ((buf[pos] == Constants.QUESTION) 
+                           && (parsingRequestLineQPos == -1)) {
+                    parsingRequestLineQPos = pos;
+                }
+                pos++;
             }
-
-            if (buf[pos] == Constants.SP) {
-                space = true;
-                end = pos;
-            } else if ((buf[pos] == Constants.CR) 
-                       || (buf[pos] == Constants.LF)) {
-                // HTTP/0.9 style request
-                eol = true;
-                space = true;
-                end = pos;
-            } else if ((buf[pos] == Constants.QUESTION) 
-                       && (questionPos == -1)) {
-                questionPos = pos;
+            request.unparsedURI().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart);
+            if (parsingRequestLineQPos >= 0) {
+                request.queryString().setBytes(buf, parsingRequestLineQPos + 1, 
+                                               end - parsingRequestLineQPos - 1);
+                request.requestURI().setBytes(buf, parsingRequestLineStart, parsingRequestLineQPos - parsingRequestLineStart);
+            } else {
+                request.requestURI().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart);
             }
-
-            pos++;
-
-        }
-
-        request.unparsedURI().setBytes(buf, start, end - start);
-        if (questionPos >= 0) {
-            request.queryString().setBytes(buf, questionPos + 1, 
-                                           end - questionPos - 1);
-            request.requestURI().setBytes(buf, start, questionPos - start);
-        } else {
-            request.requestURI().setBytes(buf, start, end - start);
+            parsingRequestLineStart = pos;
+            parsingRequestLinePhase = 4;
         }
-
-        // Mark the current buffer position
-        start = pos;
-        end = 0;
-
-        //
-        // Reading the protocol
-        // Protocol is always US-ASCII
-        //
-
-        while (!eol) {
-
-            // Read new bytes if needed
-            if (pos >= lastValid) {
-                if (!fill(true,true)) //reques line parsing
-                    return false;
-            }
-
-            if (buf[pos] == Constants.CR) {
-                end = pos;
-            } else if (buf[pos] == Constants.LF) {
-                if (end == 0)
+        if ( parsingRequestLinePhase == 4 ) {
+            // Mark the current buffer position
+            
+            end = 0;
+            //
+            // Reading the protocol
+            // Protocol is always US-ASCII
+            //
+            while (!parsingRequestLineEol) {
+                // Read new bytes if needed
+                if (pos >= lastValid) {
+                    if (!fill(true, false)) //reques line parsing
+                        return false;
+                }
+        
+                if (buf[pos] == Constants.CR) {
                     end = pos;
-                eol = true;
+                } else if (buf[pos] == Constants.LF) {
+                    if (end == 0)
+                        end = pos;
+                    parsingRequestLineEol = true;
+                }
+                pos++;
             }
-
-            pos++;
-
-        }
-
-        if ((end - start) > 0) {
-            request.protocol().setBytes(buf, start, end - start);
-        } else {
-            request.protocol().setString("");
+        
+            if ( (end - parsingRequestLineStart) > 0) {
+                request.protocol().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart);
+            } else {
+                request.protocol().setString("");
+            }
+            parsingRequestLine = false;
+            parsingRequestLinePhase = 0;
+            parsingRequestLineEol = false;
+            parsingRequestLineStart = 0;
+            return true;
         }
-        parsingRequestLine = false;
-        return true;
-
+        throw new IllegalStateException("Invalid request line parse phase:"+parsingRequestLinePhase);
     }
     
     private void expand(int newsize) {
@@ -552,6 +550,7 @@
             tmp = null;
         }
     }
+    
     /**
      * Perform blocking read with a timeout if desired
      * @param timeout boolean - if we want to use the timeout data
@@ -560,15 +559,16 @@
      * @throws IOException if a socket exception occurs
      * @throws EOFException if end of stream is reached
      */
-    private boolean readSocket(boolean timeout, boolean block) throws IOException {
+    private int readSocket(boolean timeout, boolean block) throws IOException {
         int nRead = 0;
-        long rto = timeout?this.readTimeout:-1;
         socket.getBufHandler().getReadBuffer().clear();
         if ( block ) {
             Selector selector = null;
             try { selector = getSelectorPool().get(); }catch ( IOException x ) {}
             try {
-                nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,rto);
+                NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                if ( att == null ) throw new IOException("Key must be cancelled.");
+                nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout());
             } catch ( EOFException eof ) {
                 nRead = -1;
             } finally { 
@@ -583,12 +583,12 @@
             expand(nRead + pos);
             socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
             lastValid = pos + nRead;
-            return true;
+            return nRead;
         } else if (nRead == -1) {
             //return false;
             throw new EOFException(sm.getString("iib.eof.error"));
         } else {
-            return false;
+            return 0;
         }
     }
 
@@ -630,7 +630,7 @@
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill(true,true)) {//parse header 
+                if (!fill(true,false)) {//parse header 
                     headerParsePos = HeaderParsePosition.HEADER_START;
                     return HeaderParseStatus.NEED_MORE_DATA;
                 }
@@ -668,7 +668,7 @@
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill(true,true)) { //parse header 
+                if (!fill(true,false)) { //parse header 
                     return HeaderParseStatus.NEED_MORE_DATA;
                 }
             }
@@ -708,7 +708,7 @@
 
                     // Read new bytes if needed
                     if (pos >= lastValid) {
-                        if (!fill(true,true)) {//parse header 
+                        if (!fill(true,false)) {//parse header 
                             //HEADER_VALUE, should already be set
                             return HeaderParseStatus.NEED_MORE_DATA;
                         }
@@ -729,7 +729,7 @@
 
                     // Read new bytes if needed
                     if (pos >= lastValid) {
-                        if (!fill(true,true)) {//parse header 
+                        if (!fill(true,false)) {//parse header 
                             //HEADER_VALUE
                             return HeaderParseStatus.NEED_MORE_DATA;
                         }
@@ -760,7 +760,7 @@
             }
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill(true,true)) {//parse header
+                if (!fill(true,false)) {//parse header
                     
                     //HEADER_MULTI_LINE
                     return HeaderParseStatus.NEED_MORE_DATA;
@@ -852,7 +852,7 @@
             }
 
             // Do a simple read with a short timeout
-            read = readSocket(timeout,block);
+            read = readSocket(timeout,block)>0;
         } else {
 
             if (buf.length - end < 4500) {
@@ -865,7 +865,7 @@
             pos = end;
             lastValid = pos;
             // Do a simple read with a short timeout
-            read = readSocket(timeout, block);
+            read = readSocket(timeout, block)>0;
         }
         return read;
     }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Mon Nov 19 15:28:21 2007
@@ -34,6 +34,8 @@
 import org.apache.tomcat.util.net.NioEndpoint;
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
+import java.io.EOFException;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * Output buffer.
@@ -56,14 +58,14 @@
      * Default constructor.
      */
     public InternalNioOutputBuffer(Response response) {
-        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
+        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
     }
 
 
     /**
      * Alternate constructor.
      */
-    public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) {
+    public InternalNioOutputBuffer(Response response, int headerBufferSize) {
 
         this.response = response;
         headers = response.getMimeHeaders();
@@ -86,8 +88,6 @@
         committed = false;
         finished = false;
         
-        this.writeTimeout = writeTimeout;
-
         // Cause loading of HttpMessages
         HttpMessages.getMessage(200);
 
@@ -142,6 +142,10 @@
      */
     protected int pos;
 
+    /**
+     * Number of bytes last written
+     */
+    protected MutableInteger lastWrite = new MutableInteger(1);
 
     /**
      * Underlying socket.
@@ -179,12 +183,6 @@
      */
     protected int lastActiveFilter;
     
-    /**
-     * Write time out in milliseconds
-     */
-    protected long writeTimeout = -1;
-
-
     // ------------------------------------------------------------- Properties
 
 
@@ -195,10 +193,6 @@
         this.socket = socket;
     }
 
-    public void setWriteTimeout(long writeTimeout) {
-        this.writeTimeout = writeTimeout;
-    }
-
     /**
      * Get the underlying socket input stream.
      */
@@ -206,10 +200,6 @@
         return socket;
     }
 
-    public long getWriteTimeout() {
-        return writeTimeout;
-    }
-
     public void setSelectorPool(NioSelectorPool pool) { 
         this.pool = pool;
     }
@@ -324,7 +314,6 @@
 
         // Recycle Request object
         response.recycle();
-
     }
 
 
@@ -347,6 +336,7 @@
         lastActiveFilter = -1;
         committed = false;
         finished = false;
+        lastWrite.set(1);
 
     }
 
@@ -405,11 +395,13 @@
 
     }
 
-
+    public boolean isWritable() {
+        return lastWrite.get()>0;
+    }
     // ------------------------------------------------ HTTP/1.1 Output Methods
 
 
-    /**
+    /** 
      * Send an acknoledgement.
      */
     public void sendAck()
@@ -418,15 +410,26 @@
         if (!committed) {
             //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
             socket.getBufHandler() .getWriteBuffer().put(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);    
-            writeToSocket(socket.getBufHandler() .getWriteBuffer(),true);
+            writeToSocket(socket.getBufHandler() .getWriteBuffer(),true,true);
         }
 
     }
 
-    private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
-        //int limit = bytebuffer.position();
+    /**
+     * 
+     * @param bytebuffer ByteBuffer
+     * @param flip boolean
+     * @return int
+     * @throws IOException
+     * @todo Fix non blocking write properly
+     */
+    private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
         if ( flip ) bytebuffer.flip();
+
         int written = 0;
+        NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+        if ( att == null ) throw new IOException("Key must be cancelled");
+        long writeTimeout = att.getTimeout();
         Selector selector = null;
         try {
             selector = getSelectorPool().get();
@@ -434,16 +437,17 @@
             //ignore
         }
         try {
-            written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout);
+            written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite);
             //make sure we are flushed 
             do {
-                if (socket.flush(true,selector,writeTimeout)) break;
+                if (socket.flush(true,selector,writeTimeout,lastWrite)) break;
             }while ( true );
         }finally { 
             if ( selector != null ) getSelectorPool().put(selector);
         }
-        socket.getBufHandler().getWriteBuffer().clear();
+        if ( block ) bytebuffer.clear(); //only clear
         this.total = 0;
+        return written;
     } 
 
 
@@ -762,7 +766,8 @@
 
         //write to the socket, if there is anything to write
         if (socket.getBufHandler().getWriteBuffer().position() > 0) {
-            writeToSocket(socket.getBufHandler().getWriteBuffer(),true);
+            socket.getBufHandler().getWriteBuffer().flip();
+            writeToSocket(socket.getBufHandler().getWriteBuffer(),true, false);
         }
     }
 

Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java?rev=596486&view=auto
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java (added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java Mon Nov 19 15:28:21 2007
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tomcat.util;
+
+public class MutableInteger {
+    protected int value = 0;
+    public MutableInteger() {}
+    public MutableInteger(int val) {
+        this.value = val;
+    }
+
+    public int get() { return value;}
+    public void set(int val) {this.value = val;}
+}

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Mon Nov 19 15:28:21 2007
@@ -20,13 +20,50 @@
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.MutableInteger;
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class NioBlockingSelector {
+    
+    protected static Log log = LogFactory.getLog(NioBlockingSelector.class);
+    
+    private static int threadCounter = 0;
+    
+    protected Selector sharedSelector;
+    
+    protected BlockPoller poller;
     public NioBlockingSelector() {
+        
+    }
+    
+    public void open(Selector selector) {
+        sharedSelector = selector;
+        poller = new BlockPoller();
+        poller.selector = sharedSelector;
+        poller.setDaemon(true);
+        poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter));
+        poller.start();
+    }
+    
+    public void close() {
+        if (poller!=null) {
+            poller.disable();
+            poller.interrupt();
+            poller = null;
+        }
     }
 
     /**
@@ -41,8 +78,10 @@
      * @throws SocketTimeoutException if the write times out
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
-    public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout) throws IOException {
+    public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+        if ( key == null ) throw new IOException("Key no longer registered");
+        KeyAttachment att = (KeyAttachment) key.attachment();
         int written = 0;
         boolean timedout = false;
         int keycount = 1; //assume we can write
@@ -51,6 +90,7 @@
             while ( (!timedout) && buf.hasRemaining()) {
                 if (keycount > 0) { //only write if we were registered for a write
                     int cnt = socket.write(buf); //write the data
+                    lastWrite.set(cnt);
                     if (cnt == -1)
                         throw new EOFException();
                     written += cnt;
@@ -59,12 +99,9 @@
                         continue; //we successfully wrote, try again without a selector
                     }
                 }
-                if ( key == null ) throw new IOException("Key no longer registered");
-                KeyAttachment att = (KeyAttachment) key.attachment();
                 try {
                     if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
-                    //only register for write if a write has not yet been issued
-                    if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,SelectionKey.OP_WRITE);
+                    poller.add(att,SelectionKey.OP_WRITE);
                     att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
@@ -84,22 +121,14 @@
             if (timedout) 
                 throw new SocketTimeoutException();
         } finally {
+            poller.remove(att,SelectionKey.OP_WRITE);
             if (timedout && key != null) {
-                cancelKey(socket, key);
+                poller.cancelKey(socket, key);
             }
         }
         return written;
     }
 
-    private static void cancelKey(final NioChannel socket, final SelectionKey key) {
-        socket.getPoller().addEvent(
-            new Runnable() {
-            public void run() {
-                key.cancel();
-            }
-        });
-    }
-
     /**
      * Performs a blocking read using the bytebuffer for data to be read
      * If the <code>selector</code> parameter is null, then it will perform a busy read that could
@@ -113,8 +142,10 @@
      * @throws SocketTimeoutException if the read times out
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
-    public static int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
-        final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+    public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
+        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+        if ( key == null ) throw new IOException("Key no longer registered");
+        KeyAttachment att = (KeyAttachment) key.attachment();
         int read = 0;
         boolean timedout = false;
         int keycount = 1; //assume we can write
@@ -129,10 +160,9 @@
                     if (cnt > 0)
                         break;
                 }
-                KeyAttachment att = (KeyAttachment) key.attachment();
                 try {
                     if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
-                    if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_READ);
+                    poller.add(att,SelectionKey.OP_READ);
                     att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
@@ -151,11 +181,187 @@
             if (timedout)
                 throw new SocketTimeoutException();
         } finally {
+            poller.remove(att,SelectionKey.OP_READ);
             if (timedout && key != null) {
-                cancelKey(socket,key);
+                poller.cancelKey(socket,key);
             }
         }
         return read;
+    }
+
+    
+    protected class BlockPoller extends Thread {
+        protected boolean run = true;
+        protected Selector selector = null;
+        protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+        public void disable() { run = false; selector.wakeup();}
+        protected AtomicInteger wakeupCounter = new AtomicInteger(0);
+        public void cancelKey(final NioChannel socket, final SelectionKey key) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    key.cancel();
+                }
+            };
+            events.offer(r);
+            wakeup();
+        }
+
+        public void wakeup() {
+            if (wakeupCounter.addAndGet(1)==0) selector.wakeup();
+        }
+
+        public void cancel(SelectionKey sk, KeyAttachment key, int ops){
+            if (sk!=null) {
+                sk.cancel();
+                sk.attach(null);
+                if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+                if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+            }
+        }
+        
+        public void add(final KeyAttachment key, final int ops) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    if ( key == null ) return;
+                    NioChannel nch = key.getChannel();
+                    if ( nch == null ) return;
+                    SocketChannel ch = nch.getIOChannel();
+                    if ( ch == null ) return;
+                    SelectionKey sk = ch.keyFor(selector);
+                    try {
+                        if (sk == null) {
+                            sk = ch.register(selector, ops, key);
+                        } else {
+                            sk.interestOps(sk.interestOps() | ops);
+                        }
+                    }catch (CancelledKeyException cx) {
+                        cancel(sk,key,ops);
+                    }catch (ClosedChannelException cx) {
+                        cancel(sk,key,ops);
+                    }
+                }
+            };
+            events.offer(r);
+            wakeup();
+        }
+        
+        public void remove(final KeyAttachment key, final int ops) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    if ( key == null ) return;
+                    NioChannel nch = key.getChannel();
+                    if ( nch == null ) return;
+                    SocketChannel ch = nch.getIOChannel();
+                    if ( ch == null ) return;
+                    SelectionKey sk = ch.keyFor(selector);
+                    try {
+                        if (sk == null) {
+                            if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+                            if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                        } else {
+                            sk.interestOps(sk.interestOps() & (~ops));
+                            if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+                            if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                            if (sk.interestOps()==0) {
+                                sk.cancel();
+                                sk.attach(null);
+                            }
+                        }
+                    }catch (CancelledKeyException cx) {
+                        if (sk!=null) {
+                            sk.cancel();
+                            sk.attach(null);
+                        }
+                    }
+                }
+            };
+            events.offer(r);
+            wakeup();
+        }
+
+
+        public boolean events() {
+            boolean result = false;
+            Runnable r = null;
+            result = (events.size() > 0);
+            while ( (r = (Runnable)events.poll()) != null ) {
+                r.run();
+                result = true;
+            }
+            return result;
+        }
+
+        public void run() {
+            while (run) {
+                try {
+                    events();
+                    int keyCount = 0;
+                    try {
+                        int i = wakeupCounter.get();
+                        if (i>0) 
+                            keyCount = selector.selectNow();
+                        else {
+                            wakeupCounter.set(-1);
+                            keyCount = selector.select(1000);
+                        }
+                        wakeupCounter.set(0);
+                        if (!run) break;
+                    }catch ( NullPointerException x ) {
+                        //sun bug 5076772 on windows JDK 1.5
+                        if (selector==null) throw x;
+                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
+                        continue;
+                    } catch ( CancelledKeyException x ) {
+                        //sun bug 5076772 on windows JDK 1.5
+                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
+                        continue;
+                    } catch (Throwable x) {
+                        log.error("",x);
+                        continue;
+                    }
+
+                    Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
+
+                    // Walk through the collection of ready keys and dispatch
+                    // any active event.
+                    while (run && iterator != null && iterator.hasNext()) {
+                        SelectionKey sk = (SelectionKey) iterator.next();
+                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
+                        try {
+                            attachment.access();
+                            iterator.remove(); ;
+                            sk.interestOps(sk.interestOps() & (~sk.readyOps()));
+                            if ( sk.isReadable() ) {
+                                countDown(attachment.getReadLatch());
+                            }
+                            if (sk.isWritable()) {
+                                countDown(attachment.getWriteLatch());
+                            }
+                        }catch (CancelledKeyException ckx) {
+                            if (sk!=null) sk.cancel();
+                            countDown(attachment.getReadLatch());
+                            countDown(attachment.getWriteLatch());
+                        }
+                    }//while
+                }catch ( Throwable t ) {
+                    log.error("",t);
+                }
+            }
+            events.clear();
+            try {
+                selector.selectNow();//cancel all remaining keys
+            }catch( Exception ignore ) {
+                if (log.isDebugEnabled())log.debug("",ignore);
+            }
+        }
+        
+        public void countDown(CountDownLatch latch) {
+            if ( latch == null ) return;
+            latch.countDown();
+        }
+        
+        
+        
     }
 
 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Mon Nov 19 15:28:21 2007
@@ -27,6 +27,7 @@
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
 import java.nio.channels.Selector;
 import java.nio.channels.SelectionKey;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * 
@@ -70,7 +71,8 @@
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush(boolean block, Selector s,long timeout) throws IOException {
+    public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException {
+        if (lastWrite!=null) lastWrite.set(1);
         return true; //no network buffer in the regular channel
     }
 

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Nov 19 15:28:21 2007
@@ -103,7 +103,8 @@
      */
     public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
 
-    public static final int OP_REGISTER = -1; //register interest op
+    public static final int OP_REGISTER = 0x100; //register interest op
+    public static final int OP_CALLBACK = 0x200; //callback interest op
     
     // ----------------------------------------------------------------- Fields
 
@@ -183,7 +184,10 @@
      */
     long lastParachuteCheck = System.currentTimeMillis();
     
-    
+    /**
+     * Keep track of how many threads are in use
+     */
+    protected AtomicInteger activeSocketProcessors = new AtomicInteger(0);
     
     
     
@@ -478,24 +482,11 @@
     /**
      * The socket poller.
      */
-    protected Poller[] pollers = null;
-    protected int pollerRoundRobin = 0;
+    protected Poller poller = null;
     public Poller getPoller0() {
-        pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
-        Poller poller = pollers[pollerRoundRobin];
-        return poller;
-    }
-
-
-    /**
-     * The socket poller used for Comet support.
-     */
-    public Poller getCometPoller0() {
-        Poller poller = getPoller0();
         return poller;
     }
 
-
     /**
      * Dummy maxSpareThreads property.
      */
@@ -557,13 +548,15 @@
     protected String truststoreType = System.getProperty("javax.net.ssl.trustStoreType");
     public void setTruststoreType(String truststoreType) {this.truststoreType = truststoreType;}
     public String getTruststoreType() {return truststoreType;}
-    
+
     protected String keystoreFile = System.getProperty("user.home")+"/.keystore";
     public String getKeystoreFile() { return keystoreFile;}
     public void setKeystoreFile(String s ) { 
         s = adjustRelativePath(s,System.getProperty("catalina.base"));
         this.keystoreFile = s; 
     }
+    public void setKeystore(String s ) { setKeystoreFile(s);}
+    public String getKeystore() { return getKeystoreFile();}
     
     protected String algorithm = "SunX509";
     public String getAlgorithm() { return algorithm;}
@@ -641,6 +634,7 @@
         this.oomParachuteData = oomParachuteData;
     }
 
+
     protected SSLContext sslContext = null;
     public SSLContext getSSLContext() { return sslContext;}
     public void setSSLContext(SSLContext c) { sslContext = c;}
@@ -679,14 +673,10 @@
      * Number of keepalive sockets.
      */
     public int getKeepAliveCount() {
-        if (pollers == null) {
+        if (poller == null) {
             return 0;
         } else {
-            int keepAliveCount = 0;
-            for (int i = 0; i < pollers.length; i++) {
-                keepAliveCount += pollers[i].getKeepAliveCount();
-            }
-            return keepAliveCount;
+                return poller.selector.keys().size();
         }
     }
 
@@ -767,15 +757,11 @@
             // Initialize SSL
             char[] passphrase = getKeystorePass().toCharArray();
 
-            KeyStore ks = KeyStore.getInstance(getKeystoreType());
-            ks.load(new FileInputStream(getKeystoreFile()), passphrase);
-
-            KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
-            kmf.init(ks, passphrase);
-
             char[] tpassphrase = (getTruststorePass()!=null)?getTruststorePass().toCharArray():passphrase;
             String ttype = (getTruststoreType()!=null)?getTruststoreType():getKeystoreType();
-
+            
+            KeyStore ks = KeyStore.getInstance(getKeystoreType());
+            ks.load(new FileInputStream(getKeystoreFile()), passphrase);
             KeyStore ts = null;
             if (getTruststoreFile()==null) {
                 ts = KeyStore.getInstance(getKeystoreType());
@@ -785,16 +771,18 @@
                 ts.load(new FileInputStream(getTruststoreFile()), tpassphrase);
             }
 
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
+            kmf.init(ks, passphrase);
+
             TrustManagerFactory tmf = TrustManagerFactory.getInstance(getAlgorithm());
             tmf.init(ts);
 
             sslContext = SSLContext.getInstance(getSslProtocol());
             sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
-
         }
         
         if (oomParachute>0) reclaimParachute(true);
-
+        selectorPool.open();
         initialized = true;
 
     }
@@ -819,7 +807,7 @@
                     TaskQueue taskqueue = new TaskQueue();
                     TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
                     executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
-                    taskqueue.setParent( (ThreadPoolExecutor) executor);
+                    taskqueue.setParent( (ThreadPoolExecutor) executor, this);
                 }
             } else if ( executor == null ) {//avoid two thread pools being created
                 workers = new WorkerStack(maxThreads);
@@ -833,16 +821,12 @@
                 acceptorThread.start();
             }
 
-            // Start poller threads
-            pollers = new Poller[pollerThreadCount];
-            for (int i = 0; i < pollerThreadCount; i++) {
-                pollers[i] = new Poller();
-                pollers[i].init();
-                Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);
-                pollerThread.setPriority(threadPriority);
-                pollerThread.setDaemon(true);
-                pollerThread.start();
-            }
+            // Start poller thread
+            poller = new Poller();
+            Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
+            pollerThread.setPriority(threadPriority);
+            pollerThread.setDaemon(true);
+            pollerThread.start();
         }
     }
 
@@ -876,10 +860,8 @@
         if (running) {
             running = false;
             unlockAccept();
-            for (int i = 0; i < pollers.length; i++) {
-                pollers[i].destroy();
-            }
-            pollers = null;
+                poller.destroy();
+            poller = null;
         }
         eventCache.clear();
         keyCache.clear();
@@ -891,10 +873,11 @@
                 ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
                 tpe.shutdown();
                 TaskQueue queue = (TaskQueue) tpe.getQueue();
-                queue.setParent(null);
+                queue.setParent(null,null);
             }
             executor = null;
         }
+        
     }
 
 
@@ -912,6 +895,7 @@
         sslContext = null;
         initialized = false;
         releaseCaches();
+        selectorPool.close();
     }
 
 
@@ -954,6 +938,7 @@
         return oomParachuteData;
     }
 
+
     /**
      * Unlock the server socket accept using a bogus connection.
      */
@@ -1043,7 +1028,7 @@
             engine.setNeedClientAuth(true);
         } else if ("want".equals(getClientAuth())) {
             engine.setWantClientAuth(true);
-        }        
+        }
         engine.setUseClientMode(false);
         if ( ciphersarr.length > 0 ) engine.setEnabledCipherSuites(ciphersarr);
         if ( sslEnabledProtocolsarr.length > 0 ) engine.setEnabledProtocols(sslEnabledProtocolsarr);
@@ -1165,6 +1150,8 @@
     
     protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
         try {
+            KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
+            attachment.setCometNotify(false); //will get reset upon next reg
             if (executor == null) {
                 getWorkerThread().assign(socket, status);
             } else {
@@ -1286,11 +1273,18 @@
                     if (key != null) {
                         final KeyAttachment att = (KeyAttachment) key.attachment();
                         if ( att!=null ) {
+                            //handle callback flag
+                            if (att.getComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
+                                att.setCometNotify(true);
+                            } else {
+                                att.setCometNotify(false);
+                            }
+                            interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
                             att.access();//to prevent timeout
                             //we are registering the key to start with, reset the fairness counter.
-                            att.setFairness(0);
-                            att.interestOps(interestOps);
-                            key.interestOps(interestOps);
+                            int ops = key.interestOps() | interestOps;
+                            att.interestOps(ops);
+                            key.interestOps(ops);
                         } else {
                             cancel = true;
                         }
@@ -1310,6 +1304,7 @@
             return super.toString()+"[intOps="+this.interestOps+"]";
         }
     }
+    
     /**
      * Poller class.
      */
@@ -1320,9 +1315,6 @@
         
         protected boolean close = false;
         protected long nextExpiration = 0;//optimize expiration handling
-
-        protected int keepAliveCount = 0;
-        public int getKeepAliveCount() { return keepAliveCount; }
         
         protected AtomicLong wakeupCounter = new AtomicLong(0l);
         
@@ -1337,14 +1329,6 @@
         public Selector getSelector() { return selector;}
 
         /**
-         * Create the poller. With some versions of APR, the maximum poller size will
-         * be 62 (reocmpiling APR is necessary to remove this limitation).
-         */
-        protected void init() {
-            keepAliveCount = 0;
-        }
-
-        /**
          * Destroy the poller.
          */
         protected void destroy() {
@@ -1359,7 +1343,20 @@
         
         public void addEvent(Runnable event) {
             events.offer(event);
-            if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup();
+            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
+        }
+        
+        public void cometInterest(NioChannel socket) {
+            KeyAttachment att = (KeyAttachment)socket.getAttachment(false);
+            add(socket,att.getCometOps());
+            if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
+                nextExpiration = 0; //force the check for faster callback
+                selector.wakeup();
+            }
+        }
+        
+        public void wakeup() {
+            selector.wakeup();
         }
 
         /**
@@ -1407,7 +1404,7 @@
             socket.setPoller(this);
             KeyAttachment key = keyCache.poll();
             final KeyAttachment ka = key!=null?key:new KeyAttachment();
-            ka.reset(this,socket);
+            ka.reset(this,socket,getSocketProperties().getSoTimeout());
             PollerEvent r = eventCache.poll();
             ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
             if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
@@ -1422,9 +1419,14 @@
                     //the comet event takes care of clean up
                     //processSocket(ka.getChannel(), status, dispatch);
                     ka.setComet(false);//to avoid a loop
-                    processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key
-                    if (status == SocketStatus.TIMEOUT ) return; // don't close on comet timeout
+                    if (status == SocketStatus.TIMEOUT ) {
+                        processSocket(ka.getChannel(), status, true);
+                        return; // don't close on comet timeout
+                    } else {
+                        processSocket(ka.getChannel(), status, false); //don't dispatch if the lines below are cancelling the key
+                    }                    
                 }
+                handler.release(ka.getChannel());
                 if (key.isValid()) key.cancel();
                 if (key.channel().isOpen()) try {key.channel().close();}catch (Exception ignore){}
                 try {ka.channel.close(true);}catch (Exception ignore){}
@@ -1462,7 +1464,14 @@
                     int keyCount = 0;
                     try {
                         if ( !close ) {
-                            keyCount = selector.select(selectorTimeout);
+                            if (wakeupCounter.get()>0) {
+                                //if we are here, means we have other stuff to do
+                                //do a non blocking select
+                                keyCount = selector.selectNow();
+                            }else {
+                                wakeupCounter.set( -1);
+                                keyCount = selector.select(selectorTimeout);
+                            }
                             wakeupCounter.set(0);
                         }
                         if (close) {
@@ -1473,10 +1482,12 @@
                         }
                     } catch ( NullPointerException x ) {
                         //sun bug 5076772 on windows JDK 1.5
+                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                         if ( wakeupCounter == null || selector == null ) throw x;
                         continue;
                     } catch ( CancelledKeyException x ) {
                         //sun bug 5076772 on windows JDK 1.5
+                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                         if ( wakeupCounter == null || selector == null ) throw x;
                         continue;
                     } catch (Throwable x) {
@@ -1492,6 +1503,7 @@
                     while (iterator != null && iterator.hasNext()) {
                         SelectionKey sk = (SelectionKey) iterator.next();
                         KeyAttachment attachment = (KeyAttachment)sk.attachment();
+                        attachment.access();
                         iterator.remove();
                         processKey(sk, attachment);
                     }//while
@@ -1529,24 +1541,25 @@
                     sk.attach(attachment);//cant remember why this is here
                     NioChannel channel = attachment.getChannel();
                     if (sk.isReadable() || sk.isWritable() ) {
-                        if ( sk.isReadable() && attachment.getReadLatch() != null ) {
-                            unreg(sk, attachment,SelectionKey.OP_READ);
-                            attachment.getReadLatch().countDown();
-                        } else if ( sk.isWritable() && attachment.getWriteLatch() != null ) {
-                            unreg(sk, attachment,SelectionKey.OP_WRITE);
-                            attachment.getWriteLatch().countDown();
-                        } else if ( attachment.getSendfileData() != null ) {
+                        if ( attachment.getSendfileData() != null ) {
                             processSendfile(sk,attachment,true);
                         } else if ( attachment.getComet() ) {
                             //check if thread is available
                             if ( isWorkerAvailable() ) {
-                                unreg(sk, attachment, sk.readyOps());
-                                if (!processSocket(channel, SocketStatus.OPEN))
-                                    processSocket(channel, SocketStatus.DISCONNECT);
-                                attachment.setFairness(0);
+                                //set interest ops to 0 so we don't get multiple
+                                //invokations for both read and write on separate threads
+                                reg(sk, attachment, 0);
+                                //read goes before write
+                                if (sk.isReadable()) {
+                                    //read notification
+                                    if (!processSocket(channel, SocketStatus.OPEN))
+                                        processSocket(channel, SocketStatus.DISCONNECT);
+                                } else {
+                                    //future placement of a WRITE notif
+                                    if (!processSocket(channel, SocketStatus.OPEN))
+                                        processSocket(channel, SocketStatus.DISCONNECT);
+                                }
                             } else {
-                                //increase the fairness counter
-                                attachment.incFairness();
                                 result = false;
                             }
                         } else {
@@ -1557,10 +1570,7 @@
                                 if (close) {
                                     cancelledKey(sk,SocketStatus.DISCONNECT,false);
                                 }
-                                attachment.setFairness(0);
                             } else {
-                                //increase the fairness counter
-                                attachment.incFairness();
                                 result = false;
                             }
                         }
@@ -1633,7 +1643,7 @@
             if ( ((keyCount>0 || hasEvents) ||(now < nextExpiration)) && (!close) ) {
                 return;
             }
-            long prevExp = nextExpiration;
+            long prevExp = nextExpiration; //for logging purposes only
             nextExpiration = now + socketProperties.getTimeoutInterval();
             //timeout
             Set<SelectionKey> keys = selector.keys();
@@ -1647,6 +1657,10 @@
                         cancelledKey(key, SocketStatus.ERROR,false); //we don't support any keys without attachments
                     } else if ( ka.getError() ) {
                         cancelledKey(key, SocketStatus.ERROR,true);
+                    } else if (ka.getComet() && ka.getCometNotify() ) {
+                        reg(key,ka,0);//avoid multiple calls, this gets reregistered after invokation
+                        //if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
+                        if (!processSocket(ka.getChannel(), SocketStatus.OPEN)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
                     }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                         //only timeout sockets that we are waiting for a read from
                         long delta = now - ka.getLastAccess();
@@ -1669,7 +1683,7 @@
                     cancelledKey(key, SocketStatus.ERROR,false);
                 }
             }//for
-           if ( log.isDebugEnabled() ) log.debug("timeout completed: keycount="+keycount+"; now="+now+"; nextExpiration="+prevExp+"; "+
+            if ( log.isDebugEnabled() ) log.debug("timeout completed: keys processed="+keycount+"; now="+now+"; nextExpiration="+prevExp+"; "+
                                                   "keyCount="+keyCount+"; hasEvents="+hasEvents +"; eval="+( (now < prevExp) && (keyCount>0 || hasEvents) && (!close) ));
 
         }
@@ -1681,25 +1695,27 @@
         public KeyAttachment() {
             
         }
-        public void reset(Poller poller, NioChannel channel) {
+        public void reset(Poller poller, NioChannel channel, long soTimeout) {
             this.channel = channel;
             this.poller = poller;
             lastAccess = System.currentTimeMillis();
             currentAccess = false;
             comet = false;
-            timeout = -1;
+            timeout = soTimeout;
             error = false;
-            fairness = 0;
             lastRegistered = 0;
             sendfileData = null;
             if ( readLatch!=null ) try {for (int i=0; i<(int)readLatch.getCount();i++) readLatch.countDown();}catch (Exception ignore){}
             readLatch = null;
             if ( writeLatch!=null ) try {for (int i=0; i<(int)writeLatch.getCount();i++) writeLatch.countDown();}catch (Exception ignore){}
             writeLatch = null;
+            cometNotify = false;
+            cometOps = SelectionKey.OP_READ;
+            sendfileData = null;
         }
         
         public void reset() {
-            reset(null,null);
+            reset(null,null,-1);
         }
         
         public Poller getPoller() { return poller;}
@@ -1709,9 +1725,12 @@
         public void access(long access) { lastAccess = access; }
         public void setComet(boolean comet) { this.comet = comet; }
         public boolean getComet() { return comet; }
+        public void setCometNotify(boolean notify) { this.cometNotify = notify; }
+        public boolean getCometNotify() { return cometNotify; }
+        public void setCometOps(int ops) { this.cometOps = ops; }
+        public int getCometOps() { return cometOps; }
         public boolean getCurrentAccess() { return currentAccess; }
         public void setCurrentAccess(boolean access) { currentAccess = access; }
-        public Object getMutex() {return mutex;}
         public void setTimeout(long timeout) {this.timeout = timeout;}
         public long getTimeout() {return this.timeout;}
         public boolean getError() { return error; }
@@ -1725,7 +1744,7 @@
         public CountDownLatch getReadLatch() { return readLatch; }
         public CountDownLatch getWriteLatch() { return writeLatch; }
         protected CountDownLatch resetLatch(CountDownLatch latch) {
-            if ( latch.getCount() == 0 ) return null;
+            if ( latch==null || latch.getCount() == 0 ) return null;
             else throw new IllegalStateException("Latch must be at count 0");
         }
         public void resetReadLatch() { readLatch = resetLatch(readLatch); }
@@ -1748,25 +1767,22 @@
         public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch,timeout,unit);}
         public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch,timeout,unit);}
         
-        public int getFairness() { return fairness; }
-        public void setFairness(int f) { fairness = f;}
-        public void incFairness() { fairness++; }
         public long getLastRegistered() { return lastRegistered; };
         public void setLastRegistered(long reg) { lastRegistered = reg; }
         
         public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
         public SendfileData getSendfileData() { return this.sendfileData;}
         
-        protected Object mutex = new Object();
         protected long lastAccess = -1;
         protected boolean currentAccess = false;
         protected boolean comet = false;
+        protected int cometOps = SelectionKey.OP_READ;
+        protected boolean cometNotify = false;
         protected long timeout = -1;
         protected boolean error = false;
         protected NioChannel channel = null;
         protected CountDownLatch readLatch = null;
         protected CountDownLatch writeLatch = null;
-        protected int fairness = 0;
         protected long lastRegistered = 0;
         protected SendfileData sendfileData = null;
     }
@@ -1963,6 +1979,7 @@
         public SocketState process(NioChannel socket);
         public SocketState event(NioChannel socket, SocketStatus status);
         public void releaseCaches();
+        public void release(NioChannel socket);
     }
 
 
@@ -2044,6 +2061,7 @@
         }
          
         public void run() {
+            NioEndpoint.this.activeSocketProcessors.addAndGet(1);
             SelectionKey key = null;
             try {
                 key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
@@ -2117,7 +2135,7 @@
                 status = null;
                 //return to cache
                 processorCache.offer(this);
-            }
+                NioEndpoint.this.activeSocketProcessors.addAndGet(-1);            }
         }
 
     }
@@ -2125,6 +2143,7 @@
     // ---------------------------------------------- TaskQueue Inner Class
     public static class TaskQueue extends LinkedBlockingQueue<Runnable> {
         ThreadPoolExecutor parent = null;
+        NioEndpoint endpoint = null;
         
         public TaskQueue() {
             super();
@@ -2139,8 +2158,9 @@
         }
 
         
-        public void setParent(ThreadPoolExecutor tp) {
+        public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
             parent = tp;
+            this.endpoint = ep;
         }
         
         public boolean offer(Runnable o) {
@@ -2150,7 +2170,7 @@
             if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
             //we have idle threads, just add it to the queue
             //this is an approximation, so it could use some tuning
-            if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o);
+            if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
             //if we have less threads than maximum force creation of a new thread
             if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
             //if we reached here, we need to add it to the queue

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java Mon Nov 19 15:28:21 2007
@@ -16,17 +16,19 @@
  */
 package org.apache.tomcat.util.net;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import java.nio.channels.Selector;
+import java.io.EOFException;
 import java.io.IOException;
-import java.util.NoSuchElementException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.io.EOFException;
-import java.net.SocketTimeoutException;
+import java.nio.channels.Selector;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  *
@@ -37,20 +39,30 @@
  */
 
 public class NioSelectorPool {
+    
+    public NioSelectorPool() {
+    }
+    
+    protected static int threadCount = 0;
+    
     protected static Log log = LogFactory.getLog(NioSelectorPool.class);
 
     protected final static boolean SHARED =
         Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue();
-    protected static Selector SHARED_SELECTOR;
+    
+    protected NioBlockingSelector blockingSelector;
+    
+    protected Selector SHARED_SELECTOR;
     
     protected int maxSelectors = 200;
+    protected long sharedSelectorTimeout = 30000;
     protected int maxSpareSelectors = -1;
     protected boolean enabled = true;
     protected AtomicInteger active = new AtomicInteger(0);
     protected AtomicInteger spare = new AtomicInteger(0);
     protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
 
-    protected static Selector getSharedSelector() throws IOException {
+    protected Selector getSharedSelector() throws IOException {
         if (SHARED && SHARED_SELECTOR == null) {
             synchronized ( NioSelectorPool.class ) {
                 if ( SHARED_SELECTOR == null )  {
@@ -102,6 +114,9 @@
         while ( (s = selectors.poll()) != null ) s.close();
         spare.set(0);
         active.set(0);
+        if (blockingSelector!=null) {
+            blockingSelector.close();
+        }
         if ( SHARED && getSharedSelector()!=null ) {
             getSharedSelector().close();
             SHARED_SELECTOR = null;
@@ -111,6 +126,11 @@
     public void open() throws IOException {
         enabled = true;
         getSharedSelector();
+        if (SHARED) {
+            blockingSelector = new NioBlockingSelector();
+            blockingSelector.open(getSharedSelector());
+        }
+
     }
 
     /**
@@ -127,12 +147,13 @@
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
     public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout) throws IOException {
-        return write(buf,socket,selector,writeTimeout,true);
+        return write(buf,socket,selector,writeTimeout,true,null);
     }
     
-    public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout, boolean block) throws IOException {
-        if ( SHARED && block) {
-            return NioBlockingSelector.write(buf,socket,writeTimeout);
+    public int write(ByteBuffer buf, NioChannel socket, Selector selector, 
+                     long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException {
+        if ( SHARED && block ) {
+            return blockingSelector.write(buf,socket,writeTimeout,lastWrite);
         }
         SelectionKey key = null;
         int written = 0;
@@ -144,7 +165,9 @@
                 int cnt = 0;
                 if ( keycount > 0 ) { //only write if we were registered for a write
                     cnt = socket.write(buf); //write the data
+                    if (lastWrite!=null) lastWrite.set(cnt);
                     if (cnt == -1) throw new EOFException();
+                    
                     written += cnt;
                     if (cnt > 0) {
                         time = System.currentTimeMillis(); //reset our timeout timer
@@ -202,8 +225,8 @@
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
     public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
-        if ( SHARED && block) {
-            return NioBlockingSelector.read(buf,socket,readTimeout);
+        if ( SHARED && block ) {
+            return blockingSelector.read(buf,socket,readTimeout);
         }
         SelectionKey key = null;
         int read = 0;
@@ -250,6 +273,10 @@
         this.enabled = enabled;
     }
 
+    public void setSharedSelectorTimeout(long sharedSelectorTimeout) {
+        this.sharedSelectorTimeout = sharedSelectorTimeout;
+    }
+
     public int getMaxSelectors() {
         return maxSelectors;
     }
@@ -260,5 +287,17 @@
 
     public boolean isEnabled() {
         return enabled;
+    }
+
+    public long getSharedSelectorTimeout() {
+        return sharedSelectorTimeout;
+    }
+
+    public ConcurrentLinkedQueue getSelectors() {
+        return selectors;
+    }
+
+    public AtomicInteger getSpare() {
+        return spare;
     }
 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Mon Nov 19 15:28:21 2007
@@ -25,6 +25,7 @@
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
 import java.nio.channels.Selector;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * 
@@ -102,11 +103,11 @@
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush(boolean block, Selector s, long timeout) throws IOException {
+    public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException {
         if (!block) {
             flush(netOutBuffer);
         } else {
-            pool.write(netOutBuffer, this, s, timeout);
+            pool.write(netOutBuffer, this, s, timeout,block,lastWrite);
         }
         return !netOutBuffer.hasRemaining();
     }
@@ -402,32 +403,33 @@
             if ( src != bufHandler.getWriteBuffer() ) throw new IllegalArgumentException("You can only write using the application write buffer provided by the handler.");
             //are we closing or closed?
             if ( closing || closed) throw new IOException("Channel is in closing state.");
-    
+
             //the number of bytes written
             int written = 0;
-    
+
             if (!flush(netOutBuffer)) {
                 //we haven't emptied out the buffer yet
                 return written;
             }
-    
+
             /*
              * The data buffer is empty, we can reuse the entire buffer.
              */
             netOutBuffer.clear();
-    
+
             SSLEngineResult result = sslEngine.wrap(src, netOutBuffer);
             written = result.bytesConsumed();
             netOutBuffer.flip();
-    
+
             if (result.getStatus() == Status.OK) {
                 if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) tasks();
             } else {
                 throw new IOException("Unable to wrap data, invalid engine state: " +result.getStatus());
             }        
-    
+
             //force a flush
             flush(netOutBuffer);
+
             return written;
         }
     }

Modified: tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml?rev=596486&r1=596485&r2=596486&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml Mon Nov 19 15:28:21 2007
@@ -35,6 +35,11 @@
 <section name="Tomcat 6.0.16 (remm)">
   <subsection name="General">
     <changelog>
+      <fix><bug>43846</bug>
+        Fix block simulated read and writes causing timeouts.
+        Add non blocking parsing of HTTP request headers.
+        Perf improvements(fhanik)
+      </fix>
       <update>
         Cookie handling/parsing changes!
         The following behavior has been changed with regards to Tomcat's cookie handling



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message