Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 39929 invoked from network); 19 Nov 2007 23:29:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Nov 2007 23:29:12 -0000 Received: (qmail 6604 invoked by uid 500); 19 Nov 2007 23:28:53 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 6548 invoked by uid 500); 19 Nov 2007 23:28:53 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 6514 invoked by uid 500); 19 Nov 2007 23:28:53 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 6490 invoked by uid 99); 19 Nov 2007 23:28:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Nov 2007 15:28:53 -0800 X-ASF-Spam-Status: No, hits=-99.3 required=10.0 tests=ALL_TRUSTED,FRT_LEVITRA X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Nov 2007 23:28:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8EE5B1A9832; Mon, 19 Nov 2007 15:28:28 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r596486 - in /tomcat/tc6.0.x/trunk: ./ java/org/apache/coyote/http11/ java/org/apache/tomcat/util/ java/org/apache/tomcat/util/net/ webapps/docs/ Date: Mon, 19 Nov 2007 23:28:26 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071119232828.8EE5B1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fhanik Date: Mon Nov 19 15:28:21 2007 New Revision: 596486 URL: http://svn.apache.org/viewvc?rev=596486&view=rev Log: Fix bug Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java Modified: tomcat/tc6.0.x/trunk/STATUS.txt tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml Modified: tomcat/tc6.0.x/trunk/STATUS.txt URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/STATUS.txt?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/STATUS.txt (original) +++ tomcat/tc6.0.x/trunk/STATUS.txt Mon Nov 19 15:28:21 2007 @@ -35,15 +35,6 @@ http://people.apache.org/~jfclere/patches/test_cookies.patch +1: jfclere -1: fhanik - Can we add the 'package' directive to make the package match the dir structure - -* Fix BZ 43846 - Fix output of data on simulated blocking IO - Improve speed of writing and reading - Add in non blocking request header parsing - This is a port of improvements from the old trunk and fixes the BZ above and improves greatly on the NIO connector - http://people.apache.org/~fhanik/patches/fix-nio-blocking-output.patch - +1: fhanik, jim, pero - -1: * Fix Comet bug, if servlet calls cometEvent.close() on the BEGIN event, the request still is marked as comet and ends up in a funky state http://people.apache.org/~fhanik/patches/comet-begin-event-close.patch Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Mon Nov 19 15:28:21 2007 @@ -90,12 +90,12 @@ request = new Request(); int readTimeout = endpoint.getSoTimeout(); - inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize,readTimeout); + inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize); request.setInputBuffer(inputBuffer); response = new Response(); response.setHook(this); - outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize,readTimeout); + outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize); response.setOutputBuffer(outputBuffer); request.setResponse(response); @@ -751,10 +751,7 @@ NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); if (attach != null) { attach.setComet(comet); - if (comet) { - Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout"); - if (comettimeout != null) attach.setTimeout(comettimeout.longValue()); - } else { + if (!comet) { //reset the timeout attach.setTimeout(endpoint.getSocketProperties().getSoTimeout()); } @@ -794,14 +791,6 @@ RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); - // Set the remote address - remoteAddr = null; - remoteHost = null; - localAddr = null; - localName = null; - remotePort = -1; - localPort = -1; - // Setting up the socket this.socket = socket; inputBuffer.setSocket(socket); @@ -829,30 +818,30 @@ try { if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) { socket.getIOChannel().socket().setSoTimeout((int)soTimeout); - inputBuffer.readTimeout = soTimeout; } - if (!inputBuffer.parseRequestLine(keptAlive && (endpoint.getCurrentThreadsBusy() >= limit))) { - // This means that no data is available right now - // (long keepalive), so that the processor should be recycled - // and the method should return true + if (!inputBuffer.parseRequestLine(keptAlive)) { + //no data available yet, since we might have read part + //of the request line, we can't recycle the processor openSocket = true; - // Add the socket to the poller - socket.getPoller().add(socket); + recycle = false; break; } keptAlive = true; if ( !inputBuffer.parseHeaders() ) { + //we've read part of the request, don't recycle it + //instead associate it with the socket openSocket = true; - socket.getPoller().add(socket); recycle = false; break; } request.setStartTime(System.currentTimeMillis()); if (!disableUploadTimeout) { //only for body, not for request headers socket.getIOChannel().socket().setSoTimeout((int)timeout); - inputBuffer.readTimeout = soTimeout; } } catch (IOException e) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("http11processor.header.parse"), e); + } error = true; break; } catch (Throwable t) { @@ -900,10 +889,6 @@ NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); if (attach != null) { attach.setComet(comet); - if (comet) { - Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout"); - if (comettimeout != null) attach.setTimeout(comettimeout.longValue()); - } } } } catch (InterruptedIOException e) { @@ -961,7 +946,8 @@ } } else { if ( recycle ) recycle(); - return (openSocket) ? SocketState.OPEN : SocketState.CLOSED; + //return (openSocket) ? (SocketState.OPEN) : SocketState.CLOSED; + return (openSocket) ? (recycle?SocketState.OPEN:SocketState.LONG) : SocketState.CLOSED; } } @@ -998,6 +984,12 @@ this.socket = null; this.cometClose = false; this.comet = false; + remoteAddr = null; + remoteHost = null; + localAddr = null; + localName = null; + remotePort = -1; + localPort = -1; } @@ -1070,7 +1062,7 @@ if ( attach!=null && attach.getComet()) { //if this is a comet connection //then execute the connection closure at the next selector loop - request.getAttributes().remove("org.apache.tomcat.comet.timeout"); + //request.getAttributes().remove("org.apache.tomcat.comet.timeout"); //attach.setTimeout(5000); //force a cleanup in 5 seconds //attach.setError(true); //this has caused concurrency errors } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java Mon Nov 19 15:28:21 2007 @@ -95,12 +95,13 @@ * Set a property. */ public boolean setProperty(String name, String value) { - setAttribute(name, value); + setAttribute(name, value); //store all settings if ( name!=null && (name.startsWith("socket.") ||name.startsWith("selectorPool.")) ){ return ep.setProperty(name, value); } else { return ep.setProperty(name,value); //make sure we at least try to set all properties } + } /** @@ -632,6 +633,14 @@ public void releaseCaches() { recycledProcessors.clear(); } + + public void release(NioChannel socket) { + Http11NioProcessor result = connections.remove(socket); + if ( result != null ) { + result.recycle(); + recycledProcessors.offer(result); + } + } public SocketState event(NioChannel socket, SocketStatus status) { Http11NioProcessor result = connections.get(socket); @@ -671,7 +680,9 @@ } } else { if (log.isDebugEnabled()) log.debug("Keeping processor["+result); - socket.getPoller().add(socket); + //add correct poller events here based on Comet stuff + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + socket.getPoller().add(socket,att.getCometOps()); } } } @@ -681,6 +692,8 @@ public SocketState process(NioChannel socket) { Http11NioProcessor processor = null; try { + processor = connections.remove(socket); + if (processor == null) { processor = recycledProcessors.poll(); } @@ -708,9 +721,14 @@ // Associate the connection with the processor. The next request // processed by this thread will use either a new or a recycled // processor. - if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet()); + //if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet()); connections.put(socket, processor); - socket.getPoller().add(socket); + if (processor.comet) { + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + socket.getPoller().add(socket,att.getCometOps()); + } else { + socket.getPoller().add(socket); + } } else { recycledProcessors.offer(processor); } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Mon Nov 19 15:28:21 2007 @@ -30,6 +30,7 @@ import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.net.NioEndpoint; /** * Implementation of InputBuffer which provides HTTP request header parsing as @@ -51,8 +52,7 @@ /** * Alternate constructor. */ - public InternalNioInputBuffer(Request request, int headerBufferSize, - long readTimeout) { + public InternalNioInputBuffer(Request request, int headerBufferSize) { this.request = request; headers = request.getMimeHeaders(); @@ -72,16 +72,14 @@ parsingHeader = true; parsingRequestLine = true; + parsingRequestLinePhase = 0; + parsingRequestLineEol = false; + parsingRequestLineStart = 0; + parsingRequestLineQPos = -1; headerParsePos = HeaderParsePosition.HEADER_START; headerData.recycle(); swallowInput = true; - if (readTimeout < 0) { - this.readTimeout = -1; - } else { - this.readTimeout = readTimeout; - } - } @@ -111,10 +109,15 @@ /** - * State. + * Parsing state - used for non blocking parsing so that + * when more data arrives, we can pick up where we left off. */ protected boolean parsingHeader; protected boolean parsingRequestLine; + protected int parsingRequestLinePhase = 0; + protected boolean parsingRequestLineEol = false; + protected int parsingRequestLineStart = 0; + protected int parsingRequestLineQPos = -1; protected HeaderParsePosition headerParsePos; @@ -186,12 +189,6 @@ protected int lastActiveFilter; - /** - * The socket timeout used when reading the first block of the request - * header. - */ - protected long readTimeout; - // ------------------------------------------------------------- Properties @@ -287,7 +284,23 @@ } // --------------------------------------------------------- Public Methods - + /** + * Returns true if there are bytes available from the socket layer + * @return boolean + * @throws IOException + */ + public boolean isReadable() throws IOException { + return (pos < lastValid) || (nbRead()>0); + } + + /** + * Issues a non blocking read + * @return int + * @throws IOException + */ + public int nbRead() throws IOException { + return readSocket(true,false); + } /** * Recycle the input buffer. This should be called when closing the @@ -309,6 +322,10 @@ parsingHeader = true; headerParsePos = HeaderParsePosition.HEADER_START; parsingRequestLine = true; + parsingRequestLinePhase = 0; + parsingRequestLineEol = false; + parsingRequestLineStart = 0; + parsingRequestLineQPos = -1; headerData.recycle(); swallowInput = true; @@ -350,6 +367,10 @@ parsingHeader = true; headerParsePos = HeaderParsePosition.HEADER_START; parsingRequestLine = true; + parsingRequestLinePhase = 0; + parsingRequestLineEol = false; + parsingRequestLineStart = 0; + parsingRequestLineQPos = -1; headerData.recycle(); swallowInput = true; @@ -388,160 +409,137 @@ //check state if ( !parsingRequestLine ) return true; - - int start = 0; - // // Skipping blank lines // - - byte chr = 0; - do { - - // Read new bytes if needed + if ( parsingRequestLinePhase == 0 ) { + byte chr = 0; + do { + + // Read new bytes if needed + if (pos >= lastValid) { + if (useAvailableData) { + return false; + } + // Do a simple read with a short timeout + if ( readSocket(true, false)==0 ) return false; + } + chr = buf[pos++]; + } while ((chr == Constants.CR) || (chr == Constants.LF)); + pos--; + parsingRequestLineStart = pos; + parsingRequestLinePhase = 1; + } + if ( parsingRequestLinePhase == 1 ) { + // Mark the current buffer position + if (pos >= lastValid) { if (useAvailableData) { return false; } - if (readTimeout == -1) { - if (!fill(false,true)) //request line parsing - throw new EOFException(sm.getString("iib.eof.error")); - } else { - // Do a simple read with a short timeout - if ( !readSocket(true, false) ) return false; - } - } - - chr = buf[pos++]; - - } while ((chr == Constants.CR) || (chr == Constants.LF)); - - pos--; - - // Mark the current buffer position - start = pos; - - if (pos >= lastValid) { - if (useAvailableData) { - return false; - } - if (readTimeout == -1) { - if (!fill(false,true)) //request line parsing - return false; - } else { // Do a simple read with a short timeout - if ( !readSocket(true, true) ) return false; + if ( readSocket(true, false)==0 ) return false; } + parsingRequestLinePhase = 2; } - - // - // Reading the method name - // Method name is always US-ASCII - // - - boolean space = false; - - while (!space) { - - // Read new bytes if needed - if (pos >= lastValid) { - if (!fill(true,true)) //request line parsing - return false; - } - - if (buf[pos] == Constants.SP) { - space = true; - request.method().setBytes(buf, start, pos - start); + if ( parsingRequestLinePhase == 2 ) { + // + // Reading the method name + // Method name is always US-ASCII + // + boolean space = false; + while (!space) { + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill(true, false)) //request line parsing + return false; + } + if (buf[pos] == Constants.SP) { + space = true; + request.method().setBytes(buf, parsingRequestLineStart, pos - parsingRequestLineStart); + } + pos++; } - - pos++; - + parsingRequestLineStart = pos; + parsingRequestLinePhase = 3; } - - // Mark the current buffer position - start = pos; - int end = 0; - int questionPos = -1; - - // - // Reading the URI - // - - space = false; - boolean eol = false; - - while (!space) { - - // Read new bytes if needed - if (pos >= lastValid) { - if (!fill(true,true)) //request line parsing - return false; + if ( parsingRequestLinePhase == 3 ) { + // Mark the current buffer position + + int end = 0; + // + // Reading the URI + // + boolean space = false; + while (!space) { + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill(true,false)) //request line parsing + return false; + } + if (buf[pos] == Constants.SP) { + space = true; + end = pos; + } else if ((buf[pos] == Constants.CR) + || (buf[pos] == Constants.LF)) { + // HTTP/0.9 style request + parsingRequestLineEol = true; + space = true; + end = pos; + } else if ((buf[pos] == Constants.QUESTION) + && (parsingRequestLineQPos == -1)) { + parsingRequestLineQPos = pos; + } + pos++; } - - if (buf[pos] == Constants.SP) { - space = true; - end = pos; - } else if ((buf[pos] == Constants.CR) - || (buf[pos] == Constants.LF)) { - // HTTP/0.9 style request - eol = true; - space = true; - end = pos; - } else if ((buf[pos] == Constants.QUESTION) - && (questionPos == -1)) { - questionPos = pos; + request.unparsedURI().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart); + if (parsingRequestLineQPos >= 0) { + request.queryString().setBytes(buf, parsingRequestLineQPos + 1, + end - parsingRequestLineQPos - 1); + request.requestURI().setBytes(buf, parsingRequestLineStart, parsingRequestLineQPos - parsingRequestLineStart); + } else { + request.requestURI().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart); } - - pos++; - - } - - request.unparsedURI().setBytes(buf, start, end - start); - if (questionPos >= 0) { - request.queryString().setBytes(buf, questionPos + 1, - end - questionPos - 1); - request.requestURI().setBytes(buf, start, questionPos - start); - } else { - request.requestURI().setBytes(buf, start, end - start); + parsingRequestLineStart = pos; + parsingRequestLinePhase = 4; } - - // Mark the current buffer position - start = pos; - end = 0; - - // - // Reading the protocol - // Protocol is always US-ASCII - // - - while (!eol) { - - // Read new bytes if needed - if (pos >= lastValid) { - if (!fill(true,true)) //reques line parsing - return false; - } - - if (buf[pos] == Constants.CR) { - end = pos; - } else if (buf[pos] == Constants.LF) { - if (end == 0) + if ( parsingRequestLinePhase == 4 ) { + // Mark the current buffer position + + end = 0; + // + // Reading the protocol + // Protocol is always US-ASCII + // + while (!parsingRequestLineEol) { + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill(true, false)) //reques line parsing + return false; + } + + if (buf[pos] == Constants.CR) { end = pos; - eol = true; + } else if (buf[pos] == Constants.LF) { + if (end == 0) + end = pos; + parsingRequestLineEol = true; + } + pos++; } - - pos++; - - } - - if ((end - start) > 0) { - request.protocol().setBytes(buf, start, end - start); - } else { - request.protocol().setString(""); + + if ( (end - parsingRequestLineStart) > 0) { + request.protocol().setBytes(buf, parsingRequestLineStart, end - parsingRequestLineStart); + } else { + request.protocol().setString(""); + } + parsingRequestLine = false; + parsingRequestLinePhase = 0; + parsingRequestLineEol = false; + parsingRequestLineStart = 0; + return true; } - parsingRequestLine = false; - return true; - + throw new IllegalStateException("Invalid request line parse phase:"+parsingRequestLinePhase); } private void expand(int newsize) { @@ -552,6 +550,7 @@ tmp = null; } } + /** * Perform blocking read with a timeout if desired * @param timeout boolean - if we want to use the timeout data @@ -560,15 +559,16 @@ * @throws IOException if a socket exception occurs * @throws EOFException if end of stream is reached */ - private boolean readSocket(boolean timeout, boolean block) throws IOException { + private int readSocket(boolean timeout, boolean block) throws IOException { int nRead = 0; - long rto = timeout?this.readTimeout:-1; socket.getBufHandler().getReadBuffer().clear(); if ( block ) { Selector selector = null; try { selector = getSelectorPool().get(); }catch ( IOException x ) {} try { - nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,rto); + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if ( att == null ) throw new IOException("Key must be cancelled."); + nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout()); } catch ( EOFException eof ) { nRead = -1; } finally { @@ -583,12 +583,12 @@ expand(nRead + pos); socket.getBufHandler().getReadBuffer().get(buf, pos, nRead); lastValid = pos + nRead; - return true; + return nRead; } else if (nRead == -1) { //return false; throw new EOFException(sm.getString("iib.eof.error")); } else { - return false; + return 0; } } @@ -630,7 +630,7 @@ // Read new bytes if needed if (pos >= lastValid) { - if (!fill(true,true)) {//parse header + if (!fill(true,false)) {//parse header headerParsePos = HeaderParsePosition.HEADER_START; return HeaderParseStatus.NEED_MORE_DATA; } @@ -668,7 +668,7 @@ // Read new bytes if needed if (pos >= lastValid) { - if (!fill(true,true)) { //parse header + if (!fill(true,false)) { //parse header return HeaderParseStatus.NEED_MORE_DATA; } } @@ -708,7 +708,7 @@ // Read new bytes if needed if (pos >= lastValid) { - if (!fill(true,true)) {//parse header + if (!fill(true,false)) {//parse header //HEADER_VALUE, should already be set return HeaderParseStatus.NEED_MORE_DATA; } @@ -729,7 +729,7 @@ // Read new bytes if needed if (pos >= lastValid) { - if (!fill(true,true)) {//parse header + if (!fill(true,false)) {//parse header //HEADER_VALUE return HeaderParseStatus.NEED_MORE_DATA; } @@ -760,7 +760,7 @@ } // Read new bytes if needed if (pos >= lastValid) { - if (!fill(true,true)) {//parse header + if (!fill(true,false)) {//parse header //HEADER_MULTI_LINE return HeaderParseStatus.NEED_MORE_DATA; @@ -852,7 +852,7 @@ } // Do a simple read with a short timeout - read = readSocket(timeout,block); + read = readSocket(timeout,block)>0; } else { if (buf.length - end < 4500) { @@ -865,7 +865,7 @@ pos = end; lastValid = pos; // Do a simple read with a short timeout - read = readSocket(timeout, block); + read = readSocket(timeout, block)>0; } return read; } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Mon Nov 19 15:28:21 2007 @@ -34,6 +34,8 @@ import org.apache.tomcat.util.net.NioEndpoint; import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.res.StringManager; +import java.io.EOFException; +import org.apache.tomcat.util.MutableInteger; /** * Output buffer. @@ -56,14 +58,14 @@ * Default constructor. */ public InternalNioOutputBuffer(Response response) { - this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000); + this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE); } /** * Alternate constructor. */ - public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) { + public InternalNioOutputBuffer(Response response, int headerBufferSize) { this.response = response; headers = response.getMimeHeaders(); @@ -86,8 +88,6 @@ committed = false; finished = false; - this.writeTimeout = writeTimeout; - // Cause loading of HttpMessages HttpMessages.getMessage(200); @@ -142,6 +142,10 @@ */ protected int pos; + /** + * Number of bytes last written + */ + protected MutableInteger lastWrite = new MutableInteger(1); /** * Underlying socket. @@ -179,12 +183,6 @@ */ protected int lastActiveFilter; - /** - * Write time out in milliseconds - */ - protected long writeTimeout = -1; - - // ------------------------------------------------------------- Properties @@ -195,10 +193,6 @@ this.socket = socket; } - public void setWriteTimeout(long writeTimeout) { - this.writeTimeout = writeTimeout; - } - /** * Get the underlying socket input stream. */ @@ -206,10 +200,6 @@ return socket; } - public long getWriteTimeout() { - return writeTimeout; - } - public void setSelectorPool(NioSelectorPool pool) { this.pool = pool; } @@ -324,7 +314,6 @@ // Recycle Request object response.recycle(); - } @@ -347,6 +336,7 @@ lastActiveFilter = -1; committed = false; finished = false; + lastWrite.set(1); } @@ -405,11 +395,13 @@ } - + public boolean isWritable() { + return lastWrite.get()>0; + } // ------------------------------------------------ HTTP/1.1 Output Methods - /** + /** * Send an acknoledgement. */ public void sendAck() @@ -418,15 +410,26 @@ if (!committed) { //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0 socket.getBufHandler() .getWriteBuffer().put(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length); - writeToSocket(socket.getBufHandler() .getWriteBuffer(),true); + writeToSocket(socket.getBufHandler() .getWriteBuffer(),true,true); } } - private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException { - //int limit = bytebuffer.position(); + /** + * + * @param bytebuffer ByteBuffer + * @param flip boolean + * @return int + * @throws IOException + * @todo Fix non blocking write properly + */ + private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException { if ( flip ) bytebuffer.flip(); + int written = 0; + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if ( att == null ) throw new IOException("Key must be cancelled"); + long writeTimeout = att.getTimeout(); Selector selector = null; try { selector = getSelectorPool().get(); @@ -434,16 +437,17 @@ //ignore } try { - written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout); + written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite); //make sure we are flushed do { - if (socket.flush(true,selector,writeTimeout)) break; + if (socket.flush(true,selector,writeTimeout,lastWrite)) break; }while ( true ); }finally { if ( selector != null ) getSelectorPool().put(selector); } - socket.getBufHandler().getWriteBuffer().clear(); + if ( block ) bytebuffer.clear(); //only clear this.total = 0; + return written; } @@ -762,7 +766,8 @@ //write to the socket, if there is anything to write if (socket.getBufHandler().getWriteBuffer().position() > 0) { - writeToSocket(socket.getBufHandler().getWriteBuffer(),true); + socket.getBufHandler().getWriteBuffer().flip(); + writeToSocket(socket.getBufHandler().getWriteBuffer(),true, false); } } Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java?rev=596486&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/MutableInteger.java Mon Nov 19 15:28:21 2007 @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util; + +public class MutableInteger { + protected int value = 0; + public MutableInteger() {} + public MutableInteger(int val) { + this.value = val; + } + + public int get() { return value;} + public void set(int val) {this.value = val;} +} Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Mon Nov 19 15:28:21 2007 @@ -20,13 +20,50 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.MutableInteger; import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; +import java.util.concurrent.atomic.AtomicInteger; public class NioBlockingSelector { + + protected static Log log = LogFactory.getLog(NioBlockingSelector.class); + + private static int threadCounter = 0; + + protected Selector sharedSelector; + + protected BlockPoller poller; public NioBlockingSelector() { + + } + + public void open(Selector selector) { + sharedSelector = selector; + poller = new BlockPoller(); + poller.selector = sharedSelector; + poller.setDaemon(true); + poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter)); + poller.start(); + } + + public void close() { + if (poller!=null) { + poller.disable(); + poller.interrupt(); + poller = null; + } } /** @@ -41,8 +78,10 @@ * @throws SocketTimeoutException if the write times out * @throws IOException if an IO Exception occurs in the underlying socket logic */ - public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout) throws IOException { + public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + if ( key == null ) throw new IOException("Key no longer registered"); + KeyAttachment att = (KeyAttachment) key.attachment(); int written = 0; boolean timedout = false; int keycount = 1; //assume we can write @@ -51,6 +90,7 @@ while ( (!timedout) && buf.hasRemaining()) { if (keycount > 0) { //only write if we were registered for a write int cnt = socket.write(buf); //write the data + lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); written += cnt; @@ -59,12 +99,9 @@ continue; //we successfully wrote, try again without a selector } } - if ( key == null ) throw new IOException("Key no longer registered"); - KeyAttachment att = (KeyAttachment) key.attachment(); try { if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1); - //only register for write if a write has not yet been issued - if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,SelectionKey.OP_WRITE); + poller.add(att,SelectionKey.OP_WRITE); att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); @@ -84,22 +121,14 @@ if (timedout) throw new SocketTimeoutException(); } finally { + poller.remove(att,SelectionKey.OP_WRITE); if (timedout && key != null) { - cancelKey(socket, key); + poller.cancelKey(socket, key); } } return written; } - private static void cancelKey(final NioChannel socket, final SelectionKey key) { - socket.getPoller().addEvent( - new Runnable() { - public void run() { - key.cancel(); - } - }); - } - /** * Performs a blocking read using the bytebuffer for data to be read * If the selector parameter is null, then it will perform a busy read that could @@ -113,8 +142,10 @@ * @throws SocketTimeoutException if the read times out * @throws IOException if an IO Exception occurs in the underlying socket logic */ - public static int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException { - final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException { + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + if ( key == null ) throw new IOException("Key no longer registered"); + KeyAttachment att = (KeyAttachment) key.attachment(); int read = 0; boolean timedout = false; int keycount = 1; //assume we can write @@ -129,10 +160,9 @@ if (cnt > 0) break; } - KeyAttachment att = (KeyAttachment) key.attachment(); try { if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1); - if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_READ); + poller.add(att,SelectionKey.OP_READ); att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); @@ -151,11 +181,187 @@ if (timedout) throw new SocketTimeoutException(); } finally { + poller.remove(att,SelectionKey.OP_READ); if (timedout && key != null) { - cancelKey(socket,key); + poller.cancelKey(socket,key); } } return read; + } + + + protected class BlockPoller extends Thread { + protected boolean run = true; + protected Selector selector = null; + protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); + public void disable() { run = false; selector.wakeup();} + protected AtomicInteger wakeupCounter = new AtomicInteger(0); + public void cancelKey(final NioChannel socket, final SelectionKey key) { + Runnable r = new Runnable() { + public void run() { + key.cancel(); + } + }; + events.offer(r); + wakeup(); + } + + public void wakeup() { + if (wakeupCounter.addAndGet(1)==0) selector.wakeup(); + } + + public void cancel(SelectionKey sk, KeyAttachment key, int ops){ + if (sk!=null) { + sk.cancel(); + sk.attach(null); + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + } + } + + public void add(final KeyAttachment key, final int ops) { + Runnable r = new Runnable() { + public void run() { + if ( key == null ) return; + NioChannel nch = key.getChannel(); + if ( nch == null ) return; + SocketChannel ch = nch.getIOChannel(); + if ( ch == null ) return; + SelectionKey sk = ch.keyFor(selector); + try { + if (sk == null) { + sk = ch.register(selector, ops, key); + } else { + sk.interestOps(sk.interestOps() | ops); + } + }catch (CancelledKeyException cx) { + cancel(sk,key,ops); + }catch (ClosedChannelException cx) { + cancel(sk,key,ops); + } + } + }; + events.offer(r); + wakeup(); + } + + public void remove(final KeyAttachment key, final int ops) { + Runnable r = new Runnable() { + public void run() { + if ( key == null ) return; + NioChannel nch = key.getChannel(); + if ( nch == null ) return; + SocketChannel ch = nch.getIOChannel(); + if ( ch == null ) return; + SelectionKey sk = ch.keyFor(selector); + try { + if (sk == null) { + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + } else { + sk.interestOps(sk.interestOps() & (~ops)); + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + if (sk.interestOps()==0) { + sk.cancel(); + sk.attach(null); + } + } + }catch (CancelledKeyException cx) { + if (sk!=null) { + sk.cancel(); + sk.attach(null); + } + } + } + }; + events.offer(r); + wakeup(); + } + + + public boolean events() { + boolean result = false; + Runnable r = null; + result = (events.size() > 0); + while ( (r = (Runnable)events.poll()) != null ) { + r.run(); + result = true; + } + return result; + } + + public void run() { + while (run) { + try { + events(); + int keyCount = 0; + try { + int i = wakeupCounter.get(); + if (i>0) + keyCount = selector.selectNow(); + else { + wakeupCounter.set(-1); + keyCount = selector.select(1000); + } + wakeupCounter.set(0); + if (!run) break; + }catch ( NullPointerException x ) { + //sun bug 5076772 on windows JDK 1.5 + if (selector==null) throw x; + if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); + continue; + } catch ( CancelledKeyException x ) { + //sun bug 5076772 on windows JDK 1.5 + if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); + continue; + } catch (Throwable x) { + log.error("",x); + continue; + } + + Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; + + // Walk through the collection of ready keys and dispatch + // any active event. + while (run && iterator != null && iterator.hasNext()) { + SelectionKey sk = (SelectionKey) iterator.next(); + KeyAttachment attachment = (KeyAttachment)sk.attachment(); + try { + attachment.access(); + iterator.remove(); ; + sk.interestOps(sk.interestOps() & (~sk.readyOps())); + if ( sk.isReadable() ) { + countDown(attachment.getReadLatch()); + } + if (sk.isWritable()) { + countDown(attachment.getWriteLatch()); + } + }catch (CancelledKeyException ckx) { + if (sk!=null) sk.cancel(); + countDown(attachment.getReadLatch()); + countDown(attachment.getWriteLatch()); + } + }//while + }catch ( Throwable t ) { + log.error("",t); + } + } + events.clear(); + try { + selector.selectNow();//cancel all remaining keys + }catch( Exception ignore ) { + if (log.isDebugEnabled())log.debug("",ignore); + } + } + + public void countDown(CountDownLatch latch) { + if ( latch == null ) return; + latch.countDown(); + } + + + } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Mon Nov 19 15:28:21 2007 @@ -27,6 +27,7 @@ import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; +import org.apache.tomcat.util.MutableInteger; /** * @@ -70,7 +71,8 @@ * been flushed out and is empty * @return boolean */ - public boolean flush(boolean block, Selector s,long timeout) throws IOException { + public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException { + if (lastWrite!=null) lastWrite.set(1); return true; //no network buffer in the regular channel } Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Nov 19 15:28:21 2007 @@ -103,7 +103,8 @@ */ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; - public static final int OP_REGISTER = -1; //register interest op + public static final int OP_REGISTER = 0x100; //register interest op + public static final int OP_CALLBACK = 0x200; //callback interest op // ----------------------------------------------------------------- Fields @@ -183,7 +184,10 @@ */ long lastParachuteCheck = System.currentTimeMillis(); - + /** + * Keep track of how many threads are in use + */ + protected AtomicInteger activeSocketProcessors = new AtomicInteger(0); @@ -478,24 +482,11 @@ /** * The socket poller. */ - protected Poller[] pollers = null; - protected int pollerRoundRobin = 0; + protected Poller poller = null; public Poller getPoller0() { - pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; - Poller poller = pollers[pollerRoundRobin]; - return poller; - } - - - /** - * The socket poller used for Comet support. - */ - public Poller getCometPoller0() { - Poller poller = getPoller0(); return poller; } - /** * Dummy maxSpareThreads property. */ @@ -557,13 +548,15 @@ protected String truststoreType = System.getProperty("javax.net.ssl.trustStoreType"); public void setTruststoreType(String truststoreType) {this.truststoreType = truststoreType;} public String getTruststoreType() {return truststoreType;} - + protected String keystoreFile = System.getProperty("user.home")+"/.keystore"; public String getKeystoreFile() { return keystoreFile;} public void setKeystoreFile(String s ) { s = adjustRelativePath(s,System.getProperty("catalina.base")); this.keystoreFile = s; } + public void setKeystore(String s ) { setKeystoreFile(s);} + public String getKeystore() { return getKeystoreFile();} protected String algorithm = "SunX509"; public String getAlgorithm() { return algorithm;} @@ -641,6 +634,7 @@ this.oomParachuteData = oomParachuteData; } + protected SSLContext sslContext = null; public SSLContext getSSLContext() { return sslContext;} public void setSSLContext(SSLContext c) { sslContext = c;} @@ -679,14 +673,10 @@ * Number of keepalive sockets. */ public int getKeepAliveCount() { - if (pollers == null) { + if (poller == null) { return 0; } else { - int keepAliveCount = 0; - for (int i = 0; i < pollers.length; i++) { - keepAliveCount += pollers[i].getKeepAliveCount(); - } - return keepAliveCount; + return poller.selector.keys().size(); } } @@ -767,15 +757,11 @@ // Initialize SSL char[] passphrase = getKeystorePass().toCharArray(); - KeyStore ks = KeyStore.getInstance(getKeystoreType()); - ks.load(new FileInputStream(getKeystoreFile()), passphrase); - - KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm()); - kmf.init(ks, passphrase); - char[] tpassphrase = (getTruststorePass()!=null)?getTruststorePass().toCharArray():passphrase; String ttype = (getTruststoreType()!=null)?getTruststoreType():getKeystoreType(); - + + KeyStore ks = KeyStore.getInstance(getKeystoreType()); + ks.load(new FileInputStream(getKeystoreFile()), passphrase); KeyStore ts = null; if (getTruststoreFile()==null) { ts = KeyStore.getInstance(getKeystoreType()); @@ -785,16 +771,18 @@ ts.load(new FileInputStream(getTruststoreFile()), tpassphrase); } + KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm()); + kmf.init(ks, passphrase); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(getAlgorithm()); tmf.init(ts); sslContext = SSLContext.getInstance(getSslProtocol()); sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); - } if (oomParachute>0) reclaimParachute(true); - + selectorPool.open(); initialized = true; } @@ -819,7 +807,7 @@ TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-"); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); - taskqueue.setParent( (ThreadPoolExecutor) executor); + taskqueue.setParent( (ThreadPoolExecutor) executor, this); } } else if ( executor == null ) {//avoid two thread pools being created workers = new WorkerStack(maxThreads); @@ -833,16 +821,12 @@ acceptorThread.start(); } - // Start poller threads - pollers = new Poller[pollerThreadCount]; - for (int i = 0; i < pollerThreadCount; i++) { - pollers[i] = new Poller(); - pollers[i].init(); - Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); - pollerThread.setPriority(threadPriority); - pollerThread.setDaemon(true); - pollerThread.start(); - } + // Start poller thread + poller = new Poller(); + Thread pollerThread = new Thread(poller, getName() + "-ClientPoller"); + pollerThread.setPriority(threadPriority); + pollerThread.setDaemon(true); + pollerThread.start(); } } @@ -876,10 +860,8 @@ if (running) { running = false; unlockAccept(); - for (int i = 0; i < pollers.length; i++) { - pollers[i].destroy(); - } - pollers = null; + poller.destroy(); + poller = null; } eventCache.clear(); keyCache.clear(); @@ -891,10 +873,11 @@ ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; tpe.shutdown(); TaskQueue queue = (TaskQueue) tpe.getQueue(); - queue.setParent(null); + queue.setParent(null,null); } executor = null; } + } @@ -912,6 +895,7 @@ sslContext = null; initialized = false; releaseCaches(); + selectorPool.close(); } @@ -954,6 +938,7 @@ return oomParachuteData; } + /** * Unlock the server socket accept using a bogus connection. */ @@ -1043,7 +1028,7 @@ engine.setNeedClientAuth(true); } else if ("want".equals(getClientAuth())) { engine.setWantClientAuth(true); - } + } engine.setUseClientMode(false); if ( ciphersarr.length > 0 ) engine.setEnabledCipherSuites(ciphersarr); if ( sslEnabledProtocolsarr.length > 0 ) engine.setEnabledProtocols(sslEnabledProtocolsarr); @@ -1165,6 +1150,8 @@ protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { + KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false); + attachment.setCometNotify(false); //will get reset upon next reg if (executor == null) { getWorkerThread().assign(socket, status); } else { @@ -1286,11 +1273,18 @@ if (key != null) { final KeyAttachment att = (KeyAttachment) key.attachment(); if ( att!=null ) { + //handle callback flag + if (att.getComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) { + att.setCometNotify(true); + } else { + att.setCometNotify(false); + } + interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag att.access();//to prevent timeout //we are registering the key to start with, reset the fairness counter. - att.setFairness(0); - att.interestOps(interestOps); - key.interestOps(interestOps); + int ops = key.interestOps() | interestOps; + att.interestOps(ops); + key.interestOps(ops); } else { cancel = true; } @@ -1310,6 +1304,7 @@ return super.toString()+"[intOps="+this.interestOps+"]"; } } + /** * Poller class. */ @@ -1320,9 +1315,6 @@ protected boolean close = false; protected long nextExpiration = 0;//optimize expiration handling - - protected int keepAliveCount = 0; - public int getKeepAliveCount() { return keepAliveCount; } protected AtomicLong wakeupCounter = new AtomicLong(0l); @@ -1337,14 +1329,6 @@ public Selector getSelector() { return selector;} /** - * Create the poller. With some versions of APR, the maximum poller size will - * be 62 (reocmpiling APR is necessary to remove this limitation). - */ - protected void init() { - keepAliveCount = 0; - } - - /** * Destroy the poller. */ protected void destroy() { @@ -1359,7 +1343,20 @@ public void addEvent(Runnable event) { events.offer(event); - if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup(); + if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); + } + + public void cometInterest(NioChannel socket) { + KeyAttachment att = (KeyAttachment)socket.getAttachment(false); + add(socket,att.getCometOps()); + if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) { + nextExpiration = 0; //force the check for faster callback + selector.wakeup(); + } + } + + public void wakeup() { + selector.wakeup(); } /** @@ -1407,7 +1404,7 @@ socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(); - ka.reset(this,socket); + ka.reset(this,socket,getSocketProperties().getSoTimeout()); PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); @@ -1422,9 +1419,14 @@ //the comet event takes care of clean up //processSocket(ka.getChannel(), status, dispatch); ka.setComet(false);//to avoid a loop - processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key - if (status == SocketStatus.TIMEOUT ) return; // don't close on comet timeout + if (status == SocketStatus.TIMEOUT ) { + processSocket(ka.getChannel(), status, true); + return; // don't close on comet timeout + } else { + processSocket(ka.getChannel(), status, false); //don't dispatch if the lines below are cancelling the key + } } + handler.release(ka.getChannel()); if (key.isValid()) key.cancel(); if (key.channel().isOpen()) try {key.channel().close();}catch (Exception ignore){} try {ka.channel.close(true);}catch (Exception ignore){} @@ -1462,7 +1464,14 @@ int keyCount = 0; try { if ( !close ) { - keyCount = selector.select(selectorTimeout); + if (wakeupCounter.get()>0) { + //if we are here, means we have other stuff to do + //do a non blocking select + keyCount = selector.selectNow(); + }else { + wakeupCounter.set( -1); + keyCount = selector.select(selectorTimeout); + } wakeupCounter.set(0); } if (close) { @@ -1473,10 +1482,12 @@ } } catch ( NullPointerException x ) { //sun bug 5076772 on windows JDK 1.5 + if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch ( CancelledKeyException x ) { //sun bug 5076772 on windows JDK 1.5 + if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch (Throwable x) { @@ -1492,6 +1503,7 @@ while (iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); + attachment.access(); iterator.remove(); processKey(sk, attachment); }//while @@ -1529,24 +1541,25 @@ sk.attach(attachment);//cant remember why this is here NioChannel channel = attachment.getChannel(); if (sk.isReadable() || sk.isWritable() ) { - if ( sk.isReadable() && attachment.getReadLatch() != null ) { - unreg(sk, attachment,SelectionKey.OP_READ); - attachment.getReadLatch().countDown(); - } else if ( sk.isWritable() && attachment.getWriteLatch() != null ) { - unreg(sk, attachment,SelectionKey.OP_WRITE); - attachment.getWriteLatch().countDown(); - } else if ( attachment.getSendfileData() != null ) { + if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment,true); } else if ( attachment.getComet() ) { //check if thread is available if ( isWorkerAvailable() ) { - unreg(sk, attachment, sk.readyOps()); - if (!processSocket(channel, SocketStatus.OPEN)) - processSocket(channel, SocketStatus.DISCONNECT); - attachment.setFairness(0); + //set interest ops to 0 so we don't get multiple + //invokations for both read and write on separate threads + reg(sk, attachment, 0); + //read goes before write + if (sk.isReadable()) { + //read notification + if (!processSocket(channel, SocketStatus.OPEN)) + processSocket(channel, SocketStatus.DISCONNECT); + } else { + //future placement of a WRITE notif + if (!processSocket(channel, SocketStatus.OPEN)) + processSocket(channel, SocketStatus.DISCONNECT); + } } else { - //increase the fairness counter - attachment.incFairness(); result = false; } } else { @@ -1557,10 +1570,7 @@ if (close) { cancelledKey(sk,SocketStatus.DISCONNECT,false); } - attachment.setFairness(0); } else { - //increase the fairness counter - attachment.incFairness(); result = false; } } @@ -1633,7 +1643,7 @@ if ( ((keyCount>0 || hasEvents) ||(now < nextExpiration)) && (!close) ) { return; } - long prevExp = nextExpiration; + long prevExp = nextExpiration; //for logging purposes only nextExpiration = now + socketProperties.getTimeoutInterval(); //timeout Set keys = selector.keys(); @@ -1647,6 +1657,10 @@ cancelledKey(key, SocketStatus.ERROR,false); //we don't support any keys without attachments } else if ( ka.getError() ) { cancelledKey(key, SocketStatus.ERROR,true); + } else if (ka.getComet() && ka.getCometNotify() ) { + reg(key,ka,0);//avoid multiple calls, this gets reregistered after invokation + //if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT); + if (!processSocket(ka.getChannel(), SocketStatus.OPEN)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT); }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { //only timeout sockets that we are waiting for a read from long delta = now - ka.getLastAccess(); @@ -1669,7 +1683,7 @@ cancelledKey(key, SocketStatus.ERROR,false); } }//for - if ( log.isDebugEnabled() ) log.debug("timeout completed: keycount="+keycount+"; now="+now+"; nextExpiration="+prevExp+"; "+ + if ( log.isDebugEnabled() ) log.debug("timeout completed: keys processed="+keycount+"; now="+now+"; nextExpiration="+prevExp+"; "+ "keyCount="+keyCount+"; hasEvents="+hasEvents +"; eval="+( (now < prevExp) && (keyCount>0 || hasEvents) && (!close) )); } @@ -1681,25 +1695,27 @@ public KeyAttachment() { } - public void reset(Poller poller, NioChannel channel) { + public void reset(Poller poller, NioChannel channel, long soTimeout) { this.channel = channel; this.poller = poller; lastAccess = System.currentTimeMillis(); currentAccess = false; comet = false; - timeout = -1; + timeout = soTimeout; error = false; - fairness = 0; lastRegistered = 0; sendfileData = null; if ( readLatch!=null ) try {for (int i=0; i<(int)readLatch.getCount();i++) readLatch.countDown();}catch (Exception ignore){} readLatch = null; if ( writeLatch!=null ) try {for (int i=0; i<(int)writeLatch.getCount();i++) writeLatch.countDown();}catch (Exception ignore){} writeLatch = null; + cometNotify = false; + cometOps = SelectionKey.OP_READ; + sendfileData = null; } public void reset() { - reset(null,null); + reset(null,null,-1); } public Poller getPoller() { return poller;} @@ -1709,9 +1725,12 @@ public void access(long access) { lastAccess = access; } public void setComet(boolean comet) { this.comet = comet; } public boolean getComet() { return comet; } + public void setCometNotify(boolean notify) { this.cometNotify = notify; } + public boolean getCometNotify() { return cometNotify; } + public void setCometOps(int ops) { this.cometOps = ops; } + public int getCometOps() { return cometOps; } public boolean getCurrentAccess() { return currentAccess; } public void setCurrentAccess(boolean access) { currentAccess = access; } - public Object getMutex() {return mutex;} public void setTimeout(long timeout) {this.timeout = timeout;} public long getTimeout() {return this.timeout;} public boolean getError() { return error; } @@ -1725,7 +1744,7 @@ public CountDownLatch getReadLatch() { return readLatch; } public CountDownLatch getWriteLatch() { return writeLatch; } protected CountDownLatch resetLatch(CountDownLatch latch) { - if ( latch.getCount() == 0 ) return null; + if ( latch==null || latch.getCount() == 0 ) return null; else throw new IllegalStateException("Latch must be at count 0"); } public void resetReadLatch() { readLatch = resetLatch(readLatch); } @@ -1748,25 +1767,22 @@ public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch,timeout,unit);} public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch,timeout,unit);} - public int getFairness() { return fairness; } - public void setFairness(int f) { fairness = f;} - public void incFairness() { fairness++; } public long getLastRegistered() { return lastRegistered; }; public void setLastRegistered(long reg) { lastRegistered = reg; } public void setSendfileData(SendfileData sf) { this.sendfileData = sf;} public SendfileData getSendfileData() { return this.sendfileData;} - protected Object mutex = new Object(); protected long lastAccess = -1; protected boolean currentAccess = false; protected boolean comet = false; + protected int cometOps = SelectionKey.OP_READ; + protected boolean cometNotify = false; protected long timeout = -1; protected boolean error = false; protected NioChannel channel = null; protected CountDownLatch readLatch = null; protected CountDownLatch writeLatch = null; - protected int fairness = 0; protected long lastRegistered = 0; protected SendfileData sendfileData = null; } @@ -1963,6 +1979,7 @@ public SocketState process(NioChannel socket); public SocketState event(NioChannel socket, SocketStatus status); public void releaseCaches(); + public void release(NioChannel socket); } @@ -2044,6 +2061,7 @@ } public void run() { + NioEndpoint.this.activeSocketProcessors.addAndGet(1); SelectionKey key = null; try { key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); @@ -2117,7 +2135,7 @@ status = null; //return to cache processorCache.offer(this); - } + NioEndpoint.this.activeSocketProcessors.addAndGet(-1); } } } @@ -2125,6 +2143,7 @@ // ---------------------------------------------- TaskQueue Inner Class public static class TaskQueue extends LinkedBlockingQueue { ThreadPoolExecutor parent = null; + NioEndpoint endpoint = null; public TaskQueue() { super(); @@ -2139,8 +2158,9 @@ } - public void setParent(ThreadPoolExecutor tp) { + public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) { parent = tp; + this.endpoint = ep; } public boolean offer(Runnable o) { @@ -2150,7 +2170,7 @@ if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue //this is an approximation, so it could use some tuning - if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); + if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o); //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize() selectors = new ConcurrentLinkedQueue(); - protected static Selector getSharedSelector() throws IOException { + protected Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR == null ) { @@ -102,6 +114,9 @@ while ( (s = selectors.poll()) != null ) s.close(); spare.set(0); active.set(0); + if (blockingSelector!=null) { + blockingSelector.close(); + } if ( SHARED && getSharedSelector()!=null ) { getSharedSelector().close(); SHARED_SELECTOR = null; @@ -111,6 +126,11 @@ public void open() throws IOException { enabled = true; getSharedSelector(); + if (SHARED) { + blockingSelector = new NioBlockingSelector(); + blockingSelector.open(getSharedSelector()); + } + } /** @@ -127,12 +147,13 @@ * @throws IOException if an IO Exception occurs in the underlying socket logic */ public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout) throws IOException { - return write(buf,socket,selector,writeTimeout,true); + return write(buf,socket,selector,writeTimeout,true,null); } - public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout, boolean block) throws IOException { - if ( SHARED && block) { - return NioBlockingSelector.write(buf,socket,writeTimeout); + public int write(ByteBuffer buf, NioChannel socket, Selector selector, + long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException { + if ( SHARED && block ) { + return blockingSelector.write(buf,socket,writeTimeout,lastWrite); } SelectionKey key = null; int written = 0; @@ -144,7 +165,9 @@ int cnt = 0; if ( keycount > 0 ) { //only write if we were registered for a write cnt = socket.write(buf); //write the data + if (lastWrite!=null) lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); + written += cnt; if (cnt > 0) { time = System.currentTimeMillis(); //reset our timeout timer @@ -202,8 +225,8 @@ * @throws IOException if an IO Exception occurs in the underlying socket logic */ public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException { - if ( SHARED && block) { - return NioBlockingSelector.read(buf,socket,readTimeout); + if ( SHARED && block ) { + return blockingSelector.read(buf,socket,readTimeout); } SelectionKey key = null; int read = 0; @@ -250,6 +273,10 @@ this.enabled = enabled; } + public void setSharedSelectorTimeout(long sharedSelectorTimeout) { + this.sharedSelectorTimeout = sharedSelectorTimeout; + } + public int getMaxSelectors() { return maxSelectors; } @@ -260,5 +287,17 @@ public boolean isEnabled() { return enabled; + } + + public long getSharedSelectorTimeout() { + return sharedSelectorTimeout; + } + + public ConcurrentLinkedQueue getSelectors() { + return selectors; + } + + public AtomicInteger getSpare() { + return spare; } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Mon Nov 19 15:28:21 2007 @@ -25,6 +25,7 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import java.nio.channels.Selector; +import org.apache.tomcat.util.MutableInteger; /** * @@ -102,11 +103,11 @@ * been flushed out and is empty * @return boolean */ - public boolean flush(boolean block, Selector s, long timeout) throws IOException { + public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException { if (!block) { flush(netOutBuffer); } else { - pool.write(netOutBuffer, this, s, timeout); + pool.write(netOutBuffer, this, s, timeout,block,lastWrite); } return !netOutBuffer.hasRemaining(); } @@ -402,32 +403,33 @@ if ( src != bufHandler.getWriteBuffer() ) throw new IllegalArgumentException("You can only write using the application write buffer provided by the handler."); //are we closing or closed? if ( closing || closed) throw new IOException("Channel is in closing state."); - + //the number of bytes written int written = 0; - + if (!flush(netOutBuffer)) { //we haven't emptied out the buffer yet return written; } - + /* * The data buffer is empty, we can reuse the entire buffer. */ netOutBuffer.clear(); - + SSLEngineResult result = sslEngine.wrap(src, netOutBuffer); written = result.bytesConsumed(); netOutBuffer.flip(); - + if (result.getStatus() == Status.OK) { if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) tasks(); } else { throw new IOException("Unable to wrap data, invalid engine state: " +result.getStatus()); } - + //force a flush flush(netOutBuffer); + return written; } } Modified: tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml?rev=596486&r1=596485&r2=596486&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml Mon Nov 19 15:28:21 2007 @@ -35,6 +35,11 @@
+ 43846 + Fix block simulated read and writes causing timeouts. + Add non blocking parsing of HTTP request headers. + Perf improvements(fhanik) + Cookie handling/parsing changes! The following behavior has been changed with regards to Tomcat's cookie handling --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org