tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r465417 - in /tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11: Http11NioProcessor.java InternalNioInputBuffer.java
Date Wed, 18 Oct 2006 23:24:53 GMT
Author: fhanik
Date: Wed Oct 18 16:24:52 2006
New Revision: 465417

URL: http://svn.apache.org/viewvc?view=rev&rev=465417
Log:
Implement non blocking read on HTTP requests.

A common scalability problem when it comes to HTTP is the fact that there are slow clients,
that will block a server resources while sending a HTTP request. Especially when you have
larger request headers.

On FreeBSD the kernel has a built in http filter to not wake up the application socket handle
until the entire request has been received, however on other platforms this is not available.

With the Tomcat connectors, there is an obvious problem when it comes to slow clients, if
the client sends up a partial request, Tomcat will block the thread until the client has finished
sending the request. For example, if the client has 10 headers it sends up the first 5 headers,
then the next 5 in a sequential batch, the tomcat thread is locked in a blocking read
I've tried to fix that problem by making the NIO connector be non blocking. The only time
the NIO connector will block now is when the servlet asks for data, usually the request body,
as we don't have a way to suspend a thread, like continuations.
Once we have continuations(that can truly remember thread stack data), we can have a truly
non blocking server, but we are not there yet.

I believe this code could be easily ported to APR connector with very little effort.
When you review this code, please note that I have not attemtped to rewrite the header parse
logic, I might do that in a later stage as this got a little messy, but I wanted the proof
of concept done first and reuse as much code as possible.

Please feel free to review and even flame me if needed, at least that means this got some
attention :)


Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=465417&r1=465416&r2=465417
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Wed Oct 18
16:24:52 2006
@@ -820,7 +820,7 @@
 
         boolean keptAlive = false;
         boolean openSocket = false;
-
+        boolean recycle = true;
         while (!error && keepAlive && !comet) {
 
             // Parsing the request header
@@ -829,8 +829,7 @@
                     socket.getIOChannel().socket().setSoTimeout((int)soTimeout);
                     inputBuffer.readTimeout = soTimeout;
                 }
-                if (!inputBuffer.parseRequestLine
-                        (keptAlive && (endpoint.getCurrentThreadsBusy() > limit)))
{
+                if (!inputBuffer.parseRequestLine(keptAlive && (endpoint.getCurrentThreadsBusy()
> limit))) {
                     // This means that no data is available right now
                     // (long keepalive), so that the processor should be recycled
                     // and the method should return true
@@ -839,13 +838,18 @@
                     socket.getPoller().add(socket);
                     break;
                 }
-                request.setStartTime(System.currentTimeMillis());
                 keptAlive = true;
-                if (!disableUploadTimeout) {
+                if ( !inputBuffer.parseHeaders() ) {
+                    openSocket = true;
+                    socket.getPoller().add(socket);
+                    recycle = false;
+                    break;
+                }
+                request.setStartTime(System.currentTimeMillis());
+                if (!disableUploadTimeout) { //only for body, not for request headers
                     socket.getIOChannel().socket().setSoTimeout((int)timeout);
                     inputBuffer.readTimeout = soTimeout;
                 }
-                inputBuffer.parseHeaders();
             } catch (IOException e) {
                 error = true;
                 break;
@@ -934,7 +938,7 @@
                 return SocketState.LONG;
             }
         } else {
-            recycle();
+            if ( recycle ) recycle();
             return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
         }
 

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?view=diff&rev=465417&r1=465416&r2=465417
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Wed Oct
18 16:24:52 2006
@@ -45,9 +45,10 @@
 
     // -------------------------------------------------------------- Constants
 
-
+    enum HeaderParseStatus {DONE, HAVE_MORE_HEADERS, NEED_MORE_DATA}
+    enum HeaderParsePosition {HEADER_START, HEADER_NAME, HEADER_VALUE, HEADER_MULTI_LINE}
     // ----------------------------------------------------------- Constructors
-
+    
 
     /**
      * Alternate constructor.
@@ -72,6 +73,9 @@
         lastActiveFilter = -1;
 
         parsingHeader = true;
+        parsingRequestLine = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        headerData.recycle();
         swallowInput = true;
 
         if (readTimeout < 0) {
@@ -112,6 +116,8 @@
      * State.
      */
     protected boolean parsingHeader;
+    protected boolean parsingRequestLine;
+    protected HeaderParsePosition headerParsePos;
 
 
     /**
@@ -286,6 +292,9 @@
         pos = 0;
         lastActiveFilter = -1;
         parsingHeader = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        parsingRequestLine = true;
+        headerData.recycle();
         swallowInput = true;
 
     }
@@ -325,6 +334,9 @@
         pos = 0;
         lastActiveFilter = -1;
         parsingHeader = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        parsingRequestLine = true;
+        headerData.recycle();
         swallowInput = true;
 
     }
@@ -360,6 +372,9 @@
     public boolean parseRequestLine(boolean useAvailableData)
         throws IOException {
 
+        //check state
+        if ( !parsingRequestLine ) return true;
+        
         int start = 0;
 
         //
@@ -375,7 +390,7 @@
                     return false;
                 }
                 if (readTimeout == -1) {
-                    if (!fill()) //request line parsing
+                    if (!fill(false,true)) //request line parsing
                         throw new EOFException(sm.getString("iib.eof.error"));
                 } else {
                     // Do a simple read with a short timeout
@@ -397,8 +412,8 @@
                 return false;
             }
             if (readTimeout == -1) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(false,false)) //request line parsing
+                    return false;
             } else {
                 // Do a simple read with a short timeout
                 if ( !readSocket(true, false) ) return false;
@@ -416,8 +431,8 @@
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //request line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.SP) {
@@ -445,8 +460,8 @@
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //request line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.SP) {
@@ -489,8 +504,8 @@
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //reques line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //reques line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.CR) {
@@ -510,7 +525,7 @@
         } else {
             request.protocol().setString("");
         }
-
+        parsingRequestLine = false;
         return true;
 
     }
@@ -552,7 +567,7 @@
             } else if ( !block ) {
                 return false;
             } else {
-                timedOut = (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout);
+                timedOut = timeout && (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout);
                 if ( !timedOut && nRead == 0 )  {
                     try {
                         final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
@@ -604,15 +619,20 @@
     /**
      * Parse the HTTP headers.
      */
-    public void parseHeaders()
+    public boolean parseHeaders()
         throws IOException {
-
-        while (parseHeader()) {
+        HeaderParseStatus status = HeaderParseStatus.HAVE_MORE_HEADERS;
+        
+        do {
+            status = parseHeader();
+        } while ( status == HeaderParseStatus.HAVE_MORE_HEADERS );
+        if (status == HeaderParseStatus.DONE) {
+            parsingHeader = false;
+            end = pos;
+            return true;
+        } else {
+            return false;
         }
-
-        parsingHeader = false;
-        end = pos;
-
     }
 
 
@@ -622,7 +642,7 @@
      * @return false after reading a blank line (which indicates that the
      * HTTP header parsing is done
      */
-    public boolean parseHeader()
+    public HeaderParseStatus parseHeader()
         throws IOException {
 
         //
@@ -630,12 +650,14 @@
         //
 
         byte chr = 0;
-        while (true) {
+        while (headerParsePos == HeaderParsePosition.HEADER_START) {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) {//parse header 
+                    headerParsePos = HeaderParsePosition.HEADER_START;
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             chr = buf[pos];
@@ -643,7 +665,7 @@
             if ((chr == Constants.CR) || (chr == Constants.LF)) {
                 if (chr == Constants.LF) {
                     pos++;
-                    return false;
+                    return HeaderParseStatus.DONE;
                 }
             } else {
                 break;
@@ -653,28 +675,31 @@
 
         }
 
-        // Mark the current buffer position
-        int start = pos;
+        if ( headerParsePos == HeaderParsePosition.HEADER_START ) {
+            // Mark the current buffer position
+            headerData.start = pos;
+            headerParsePos = HeaderParsePosition.HEADER_NAME;
+        }    
 
         //
         // Reading the header name
         // Header name is always US-ASCII
         //
+        
+        
 
-        boolean colon = false;
-        MessageBytes headerValue = null;
-
-        while (!colon) {
+        while (headerParsePos == HeaderParsePosition.HEADER_NAME) {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) { //parse header 
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             if (buf[pos] == Constants.COLON) {
-                colon = true;
-                headerValue = headers.addValue(buf, start, pos - start);
+                headerParsePos = HeaderParsePosition.HEADER_VALUE;
+                headerData.headerValue = headers.addValue(buf, headerData.start, pos - headerData.start);
             }
             chr = buf[pos];
             if ((chr >= Constants.A) && (chr <= Constants.Z)) {
@@ -682,97 +707,121 @@
             }
 
             pos++;
-
+            if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) { 
+                // Mark the current buffer position
+                headerData.start = pos;
+                headerData.realPos = pos;
+            }
         }
 
-        // Mark the current buffer position
-        start = pos;
-        int realPos = pos;
-
+        
         //
         // Reading the header value (which can be spanned over multiple lines)
         //
 
         boolean eol = false;
-        boolean validLine = true;
 
-        while (validLine) {
+        while (headerParsePos == HeaderParsePosition.HEADER_VALUE ||
+               headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE) {
+            if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) {
+            
+                boolean space = true;
 
-            boolean space = true;
+                // Skipping spaces
+                while (space) {
 
-            // Skipping spaces
-            while (space) {
+                    // Read new bytes if needed
+                    if (pos >= lastValid) {
+                        if (!fill(true,false)) {//parse header 
+                            //HEADER_VALUE, should already be set
+                            return HeaderParseStatus.NEED_MORE_DATA;
+                        }
+                    }
 
-                // Read new bytes if needed
-                if (pos >= lastValid) {
-                    if (!fill()) //parse header
-                        throw new EOFException(sm.getString("iib.eof.error"));
-                }
+                    if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) {
+                        pos++;
+                    } else {
+                        space = false;
+                    }
 
-                if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) {
-                    pos++;
-                } else {
-                    space = false;
                 }
 
-            }
-
-            int lastSignificantChar = realPos;
+                headerData.lastSignificantChar = headerData.realPos;
 
-            // Reading bytes until the end of the line
-            while (!eol) {
+                // Reading bytes until the end of the line
+                while (!eol) {
 
-                // Read new bytes if needed
-                if (pos >= lastValid) {
-                    if (!fill()) //parse header
-                        throw new EOFException(sm.getString("iib.eof.error"));
-                }
+                    // Read new bytes if needed
+                    if (pos >= lastValid) {
+                        if (!fill(true,false)) {//parse header 
+                            //HEADER_VALUE
+                            return HeaderParseStatus.NEED_MORE_DATA;
+                        }
 
-                if (buf[pos] == Constants.CR) {
-                } else if (buf[pos] == Constants.LF) {
-                    eol = true;
-                } else if (buf[pos] == Constants.SP) {
-                    buf[realPos] = buf[pos];
-                    realPos++;
-                } else {
-                    buf[realPos] = buf[pos];
-                    realPos++;
-                    lastSignificantChar = realPos;
-                }
+                    }
 
-                pos++;
+                    if (buf[pos] == Constants.CR) {
+                    } else if (buf[pos] == Constants.LF) {
+                        eol = true;
+                    } else if (buf[pos] == Constants.SP) {
+                        buf[headerData.realPos] = buf[pos];
+                        headerData.realPos++;
+                    } else {
+                        buf[headerData.realPos] = buf[pos];
+                        headerData.realPos++;
+                        headerData.lastSignificantChar = headerData.realPos;
+                    }
 
-            }
+                    pos++;
 
-            realPos = lastSignificantChar;
+                }
 
-            // Checking the first character of the new line. If the character
-            // is a LWS, then it's a multiline header
+                headerData.realPos = headerData.lastSignificantChar;
 
+                // Checking the first character of the new line. If the character
+                // is a LWS, then it's a multiline header
+                headerParsePos = HeaderParsePosition.HEADER_MULTI_LINE;
+            }
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) {//parse header
+                    
+                    //HEADER_MULTI_LINE
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             chr = buf[pos];
-            if ((chr != Constants.SP) && (chr != Constants.HT)) {
-                validLine = false;
-            } else {
-                eol = false;
-                // Copying one extra space in the buffer (since there must
-                // be at least one space inserted between the lines)
-                buf[realPos] = chr;
-                realPos++;
+            if ( headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE ) {
+                if ( (chr != Constants.SP) && (chr != Constants.HT)) {
+                    headerParsePos = HeaderParsePosition.HEADER_START;
+                } else {
+                    eol = false;
+                    // Copying one extra space in the buffer (since there must
+                    // be at least one space inserted between the lines)
+                    buf[headerData.realPos] = chr;
+                    headerData.realPos++;
+                }
             }
-
         }
-
         // Set the header value
-        headerValue.setBytes(buf, start, realPos - start);
-
-        return true;
-
+        headerData.headerValue.setBytes(buf, headerData.start, headerData.realPos - headerData.start);
+        headerData.recycle();
+        return HeaderParseStatus.HAVE_MORE_HEADERS;
+    }
+    
+    protected HeaderParseData headerData = new HeaderParseData();
+    public static class HeaderParseData {
+        int start = 0;
+        int realPos = 0;
+        int lastSignificantChar = 0;
+        MessageBytes headerValue = null;
+        public void recycle() {
+            start = 0;
+            realPos = 0;
+            lastSignificantChar = 0;
+            headerValue = null;
+        }
     }
 
 
@@ -795,14 +844,13 @@
 
     // ------------------------------------------------------ Protected Methods
 
-
     /**
      * Fill the internal buffer using data from the undelying input stream.
      * 
      * @return false if at end of stream
      */
-    protected boolean fill()
-        throws IOException {
+    protected boolean fill(boolean timeout, boolean block)
+        throws IOException, EOFException {
 
         boolean read = false;
 
@@ -814,7 +862,7 @@
             }
 
             // Do a simple read with a short timeout
-            read = readSocket(true,true);
+            read = readSocket(timeout,block);
         } else {
 
             if (buf.length - end < 4500) {
@@ -827,7 +875,7 @@
             pos = end;
             lastValid = pos;
             // Do a simple read with a short timeout
-            read = readSocket(true, true);
+            read = readSocket(timeout, block);
         }
         return read;
     }
@@ -851,7 +899,7 @@
             throws IOException {
 
             if (pos >= lastValid) {
-                if (!fill()) //read body
+                if (!fill(true,true)) //read body, must be blocking, as the thread is inside
the app
                     return -1;
             }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message