tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Filip Hanik \(mailing lists\)" <devli...@hanik.com>
Subject RE: svn commit: r1358055 - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/ java/org/apache/tomcat/util/net
Date Fri, 06 Jul 2012 16:35:10 GMT
Turning on Java 7 does change the test landscape.
Right now, I can get the test suite to run fine on Java 6, but lots of errors on Java 7

Filip

> -----Original Message-----
> From: Filip Hanik (mailing lists) [mailto:devlists@hanik.com]
> Sent: Friday, July 06, 2012 1:17 AM
> To: 'Tomcat Developers List'
> Subject: RE: svn commit: r1358055 - in /tomcat/trunk:
> java/org/apache/catalina/connector/ java/org/apache/catalina/core/
> java/org/apache/coyote/ java/org/apache/coyote/ajp/
> java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/
> java/org/apache/tomcat/util/net
> 
> NIO Tests:    128
> BIO Tests:    128
> NIO Failures      1
> BIO Failures     10
> 
> I'll take a look at test failures on Monday, unless someone beats me to
> it.
> 
> They are not consistent, as they don't happen when running individual
> tests, only when running the entire test suite.
> 
> Filip
> 
> > -----Original Message-----
> > From: fhanik@apache.org [mailto:fhanik@apache.org]
> > Sent: Friday, July 06, 2012 12:54 AM
> > To: dev@tomcat.apache.org
> > Subject: svn commit: r1358055 - in /tomcat/trunk:
> > java/org/apache/catalina/connector/ java/org/apache/catalina/core/
> > java/org/apache/coyote/ java/org/apache/coyote/ajp/
> > java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/
> > java/org/apache/tomcat/util/net/...
> >
> > Author: fhanik
> > Date: Fri Jul  6 06:53:52 2012
> > New Revision: 1358055
> >
> > URL: http://svn.apache.org/viewvc?rev=1358055&view=rev
> > Log:
> > implement rev 1 of async/non blocking writes
> >
> >
> > Modified:
> >     tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> >
> >
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> >     tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> >     tomcat/trunk/java/org/apache/catalina/connector/Request.java
> >     tomcat/trunk/java/org/apache/catalina/connector/Response.java
> >     tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> >     tomcat/trunk/java/org/apache/coyote/ActionCode.java
> >     tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
> >     tomcat/trunk/java/org/apache/coyote/Response.java
> >     tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> >     tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> >
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> >     tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> >
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> >
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> >     tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
> >     tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> >     tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> >
> >
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> > a
> >     tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> >
> > Modified:
> > tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> > ctor/CoyoteAdapter.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> > Fri Jul  6 06:53:52 2012
> > @@ -296,18 +296,15 @@ public class CoyoteAdapter implements Ad
> >              }
> >
> >
> > -            if (!request.isAsyncDispatching() && request.isAsync()) {
> > -                AtomicBoolean result = new AtomicBoolean(true);
> > -                req.action(ActionCode.ASYNC_DISPATCH_FOR_OPERATION,
> > this);
> > -                if (result.get()) {
> > -                    if (status==SocketStatus.OPEN_WRITE) {
> > -                        //TODO Notify write listener
> > -                    } else if (status==SocketStatus.OPEN_READ) {
> > -                        //TODO Notify read listener
> > -                        asyncConImpl.canRead();
> > -                    }
> > -                    success = true;
> > +            if (!request.isAsyncDispatching() && request.isAsync() &&
> > request.isAsyncOperation()) {
> > +                if (status == SocketStatus.OPEN_WRITE) {
> > +                    // TODO Notify write listener
> > +                    success = asyncConImpl.canWrite();
> > +                } else if (status == SocketStatus.OPEN_READ) {
> > +                    // TODO Notify read listener
> > +                    success = asyncConImpl.canRead();
> >                  }
> > +
> >              }
> >
> >              if (request.isAsyncDispatching()) {
> >
> > Modified:
> >
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> >
> ctor/CoyoteOutputStream.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> > (original)
> > +++
> >
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> > Fri Jul  6 06:53:52 2012
> > @@ -109,23 +109,15 @@ public class CoyoteOutputStream
> >          ob.close();
> >      }
> >
> > -    /**
> > -     * TODO SERVLET 3.1
> > -     */
> >      @Override
> >      public boolean canWrite() {
> > -        // TODO Auto-generated method stub
> > -        return false;
> > +        return ob.canWrite();
> >      }
> >
> >
> > -    /**
> > -     * TODO SERVLET 3.1
> > -     */
> >      @Override
> >      public void setWriteListener(WriteListener listener) {
> > -        // TODO Auto-generated method stub
> > -
> > +        ob.setWriteListener(listener);
> >      }
> >
> >
> >
> > Modified:
> > tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> > ctor/OutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -23,6 +23,9 @@ import java.security.AccessController;
> >  import java.security.PrivilegedActionException;
> >  import java.security.PrivilegedExceptionAction;
> >  import java.util.HashMap;
> > +import java.util.concurrent.atomic.AtomicBoolean;
> > +
> > +import javax.servlet.WriteListener;
> >
> >  import org.apache.catalina.Globals;
> >  import org.apache.coyote.ActionCode;
> > @@ -607,4 +610,25 @@ public class OutputBuffer extends Writer
> >      }
> >
> >
> > +    public boolean canWrite() {
> > +        if (getWriteListener()==null) throw new
> > IllegalStateException("not in non blocking mode.");
> > +        AtomicBoolean canWrite = new AtomicBoolean(true);
> > +        coyoteResponse.action(ActionCode.NB_WRITE_INTEREST,
> canWrite);
> > +        return canWrite.get();
> > +}
> > +
> > +
> > +
> > +    private volatile WriteListener listener;
> > +    public void setWriteListener(WriteListener listener) {
> > +        this.listener = listener;
> > +        coyoteResponse.action(ActionCode.SET_WRITE_LISTENER,
> listener);
> > +    }
> > +
> > +    public WriteListener getWriteListener() {
> > +        return listener;
> > +    }
> > +
> > +
> > +
> >  }
> >
> > Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> > ctor/Request.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/connector/Request.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Fri
> Jul
> > 6 06:53:52 2012
> > @@ -1653,6 +1653,16 @@ public class Request
> >          return result.get();
> >      }
> >
> > +    public boolean isAsyncOperation() {
> > +        if (asyncContext == null) {
> > +            return false;
> > +        }
> > +
> > +        AtomicBoolean result = new AtomicBoolean(false);
> > +        coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC_OPERATION,
> > result);
> > +        return result.get();
> > +    }
> > +
> >      @Override
> >      public boolean isAsyncSupported() {
> >          if (this.asyncSupported == null) {
> >
> > Modified:
> tomcat/trunk/java/org/apache/catalina/connector/Response.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> > ctor/Response.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/connector/Response.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/connector/Response.java Fri
> > Jul  6 06:53:52 2012
> > @@ -153,6 +153,10 @@ public class Response
> >          outputBuffer.setResponse(coyoteResponse);
> >      }
> >
> > +    public org.apache.coyote.Response getCoyoteResponse() {
> > +        return this.coyoteResponse;
> > +    }
> > +
> >
> >      /**
> >       * Return the Context within which this Request is being
> processed.
> >
> > Modified:
> > tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/
> > AsyncContextImpl.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> Fri
> > Jul  6 06:53:52 2012
> > @@ -106,6 +106,8 @@ public class AsyncContextImpl implements
> >      }
> >
> >      public boolean canRead() throws IOException {
> > +        if (request.getCoyoteRequest().getReadListener()==null)
> return
> > false;
> > +
> >          ClassLoader oldCL =
> > Thread.currentThread().getContextClassLoader();
> >          ClassLoader newCL =
> > request.getContext().getLoader().getClassLoader();
> >          try {
> > @@ -121,7 +123,16 @@ public class AsyncContextImpl implements
> >      }
> >
> >      public boolean canWrite() throws IOException {
> > -        return false;
> > +        if
> > (request.getResponse().getCoyoteResponse().getWriteListener()==null)
> > return false;
> > +        ClassLoader oldCL =
> > Thread.currentThread().getContextClassLoader();
> > +        ClassLoader newCL =
> > request.getContext().getLoader().getClassLoader();
> > +        try {
> > +            Thread.currentThread().setContextClassLoader(newCL);
> > +
> >
> request.getResponse().getCoyoteResponse().getWriteListener().onWritePoss
> > ible();
> > +        }finally {
> > +            Thread.currentThread().setContextClassLoader(oldCL);
> > +    }
> > +        return true;
> >      }
> >
> >      public boolean timeout() throws IOException {
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionC
> > ode.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
> > +++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Fri Jul  6
> > 06:53:52 2012
> > @@ -183,6 +183,11 @@ public enum ActionCode {
> >      ASYNC_IS_ASYNC,
> >
> >      /**
> > +     * Callback to determine if async read/write is in progress
> > +     */
> > +    ASYNC_IS_ASYNC_OPERATION,
> > +
> > +    /**
> >       * Callback to determine if async dispatch is in progress
> >       */
> >      ASYNC_IS_STARTED,
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncSt
> > ateMachine.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Fri Jul
> > 6 06:53:52 2012
> > @@ -149,6 +149,10 @@ public class AsyncStateMachine<S> {
> >          return state.isAsync();
> >      }
> >
> > +    public boolean isAsyncOperation() {
> > +        return state == AsyncState.READ_WRITE_OP;
> > +    }
> > +
> >      public boolean isAsyncDispatching() {
> >          return state.isDispatching();
> >      }
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/Response.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Respons
> > e.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/Response.java (original)
> > +++ tomcat/trunk/java/org/apache/coyote/Response.java Fri Jul  6
> > 06:53:52 2012
> > @@ -21,6 +21,11 @@ import java.io.IOException;
> >  import java.io.StringReader;
> >  import java.util.Locale;
> >
> > +import javax.servlet.ReadListener;
> > +import javax.servlet.WriteListener;
> > +
> > +import org.apache.coyote.http11.AbstractInputBuffer;
> > +import org.apache.coyote.http11.AbstractOutputBuffer;
> >  import org.apache.tomcat.util.buf.ByteChunk;
> >  import org.apache.tomcat.util.http.MimeHeaders;
> >  import org.apache.tomcat.util.http.parser.AstMediaType;
> > @@ -541,4 +546,29 @@ public final class Response {
> >          }
> >          return outputBuffer.getBytesWritten();
> >      }
> > +
> > +    protected volatile WriteListener listener;
> > +
> > +    public WriteListener getWriteListener() {
> > +        return listener;
> > +}
> > +
> > +    public void setWriteListener(WriteListener listener) {
> > +        //TODO SERVLET 3.1 is it allowed to switch from non block to
> > blocking?
> > +        setBlocking(listener==null);
> > +        this.listener = listener;
> > +    }
> > +
> > +    protected volatile boolean blocking = true;
> > +
> > +    public boolean isBlocking() {
> > +        return blocking;
> > +    }
> > +
> > +    public void setBlocking(boolean blocking) throws
> > IllegalStateException {
> > +        @SuppressWarnings("rawtypes")
> > +        AbstractOutputBuffer buf =
> (AbstractOutputBuffer)outputBuffer;
> > +        if (!blocking && !buf.supportsNonBlocking()) throw new
> > IllegalStateException();
> > +        this.blocking = blocking;
> > +    }
> >  }
> >
> > Modified:
> > tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/Abs
> > tractAjpProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> > Fri Jul  6 06:53:52 2012
> > @@ -455,6 +455,8 @@ public abstract class AbstractAjpProcess
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncDispatching());
> >          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
> >              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> > +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION)
> {
> > +            ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncOperation());
> >          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncTimingOut());
> >          } else if (actionCode == ActionCode.UPGRADE) {
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/Ajp
> > NioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Fri
> Jul
> > 6 06:53:52 2012
> > @@ -252,8 +252,7 @@ public class AjpNioProcessor extends Abs
> >
> >          if (actionCode == ActionCode.ASYNC_COMPLETE) {
> >              if (asyncStateMachine.asyncComplete()) {
> > -                ((NioEndpoint)endpoint).processSocket(this.socket,
> > -                        SocketStatus.OPEN_READ, false);
> > +                ((NioEndpoint)endpoint).dispatchForEvent(socket,
> > SocketStatus.OPEN_READ, false);
> >              }
> >          } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
> >              if (param == null) return;
> > @@ -264,11 +263,9 @@ public class AjpNioProcessor extends Abs
> >              }
> >          } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
> >              if (asyncStateMachine.asyncDispatch()) {
> > -                ((NioEndpoint)endpoint).processSocket(this.socket,
> > -                        SocketStatus.OPEN_READ, true);
> > +                ((NioEndpoint)endpoint).dispatchForEvent(socket,
> > SocketStatus.OPEN_READ, true);            }
> >              }
> >          }
> > -    }
> >
> >
> >      // ------------------------------------------------------
> Protected
> > Methods
> >
> > Modified:
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> >
> AbstractHttp11Processor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> > (original)
> > +++
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> > Fri Jul  6 06:53:52 2012
> > @@ -832,6 +832,8 @@ public abstract class AbstractHttp11Proc
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncDispatching());
> >          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
> >              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> > +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION)
> {
> > +            ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncOperation());
> >          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncTimingOut());
> >          } else if (actionCode == ActionCode.UPGRADE) {
> >
> > Modified:
> > tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> > AbstractOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> > (original)
> > +++
> tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -546,4 +546,9 @@ public abstract class AbstractOutputBuff
> >          }
> >      }
> >
> > +    // ---------------------------------------------------------
> Public
> > Methods
> > +
> > +
> > +    public abstract boolean supportsNonBlocking();
> > +
> >  }
> >
> > Modified:
> > tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> > Http11NioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> > Fri Jul  6 06:53:52 2012
> > @@ -20,9 +20,11 @@ import java.io.IOException;
> >  import java.io.InterruptedIOException;
> >  import java.net.InetAddress;
> >  import java.nio.channels.SelectionKey;
> > +import java.util.concurrent.atomic.AtomicBoolean;
> >
> >  import javax.net.ssl.SSLEngine;
> >  import javax.servlet.ReadListener;
> > +import javax.servlet.WriteListener;
> >
> >  import org.apache.coyote.ActionCode;
> >  import org.apache.coyote.RequestInfo;
> > @@ -102,19 +104,7 @@ public class Http11NioProcessor extends
> >       */
> >      protected SocketWrapper<NioChannel> socket = null;
> >
> > -    /**
> > -     * TODO SERVLET 3.1
> > -     */
> > -    protected ReadListener readListener;
> > -
> > -    public ReadListener getReadListener() {
> > -        return readListener;
> > -    }
> > -
> > -    public void setReadListener(ReadListener listener) {
> > -        readListener = listener;
> > -    }
> > -
> > +    protected volatile boolean wantOnWritePossible = false;
> >
> >      // ---------------------------------------------------------
> Public
> > Methods
> >
> > @@ -185,6 +175,84 @@ public class Http11NioProcessor extends
> >      }
> >
> >
> > +
> > +
> > +    @Override
> > +    public SocketState asyncDispatch(SocketStatus status) {
> > +        final NioEndpoint.KeyAttachment attach =
> > (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> > +
> > +
> > +        if (status == SocketStatus.OPEN_WRITE) {
> > +            try {
> > +                asyncStateMachine.asyncOperation();
> > +                try {
> > +                    if (outputBuffer.hasDataToWrite()) {
> > +                        //System.out.println("Attempting data
> > flush!!");
> > +                        outputBuffer.flushBuffer(false);
> > +                    }
> > +                }catch (IOException x) {
> > +                    if (log.isDebugEnabled()) log.debug("Unable to
> > write async data.",x);
> > +                    //TODO FIXME-- fix - so we can notify of error
> > +                    return SocketState.CLOSED;
> > +                }
> > +                //return if we have more data to write
> > +                if (isRegisteredForWrite(attach)) {
> > +                    return SocketState.LONG;
> > +                }
> > +            }catch (IllegalStateException x) {
> > +            }
> > +        } else if (status == SocketStatus.OPEN_READ) {
> > +            try {
> > +                try {
> > +                    if (inputBuffer.nbRead()>0) {
> > +                        asyncStateMachine.asyncOperation();
> > +                    }
> > +                }catch (IOException x) {
> > +                    if (log.isDebugEnabled()) log.debug("Unable to
> read
> > async data.",x);
> > +                  //TODO FIXME-- fix - so we can notify of error
> > +                    return SocketState.CLOSED;
> > +                }
> > +                //return if we have more data to write
> > +            }catch (IllegalStateException x) {
> > +            }
> > +        }
> > +
> > +        SocketState state = super.asyncDispatch(status);
> > +        //return if we have more data to write
> > +        if (isRegisteredForWrite(attach)) {
> > +            return SocketState.LONG;
> > +        } else {
> > +            return state;
> > +        }
> > +    }
> > +
> > +
> > +
> > +    @Override
> > +    public SocketState process(SocketWrapper<NioChannel>
> socketWrapper)
> > throws IOException {
> > +        SocketState state = super.process(socketWrapper);
> > +        final NioEndpoint.KeyAttachment attach =
> > (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> > +        //return if we have more data to write
> > +        if (isRegisteredForWrite(attach)) {
> > +            return SocketState.LONG;
> > +        } else {
> > +            return state;
> > +        }
> > +    }
> > +
> > +
> > +
> > +
> > +    protected boolean isRegisteredForWrite(KeyAttachment attach) {
> > +        //return if we have more data to write
> > +        if (outputBuffer.hasDataToWrite()) {
> > +            attach.interestOps(SelectionKey.OP_WRITE);
> > +            return true;
> > +        } else {
> > +            return false;
> > +        }
> > +    }
> > +
> >      @Override
> >      protected void resetTimeouts() {
> >          final NioEndpoint.KeyAttachment attach =
> > (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> > @@ -305,11 +373,14 @@ public class Http11NioProcessor extends
> >      }
> >
> >
> > +
> > +
> >      @Override
> >      public void recycleInternal() {
> >          socket = null;
> >          comet = false;
> >          sendfileData = null;
> > +        wantOnWritePossible = false;
> >      }
> >
> >
> > @@ -492,8 +563,7 @@ public class Http11NioProcessor extends
> >              }
> >          } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
> >              if (asyncStateMachine.asyncComplete()) {
> > -
> > ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
> > -                        SocketStatus.OPEN_READ, true);
> > +
> >
> ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketS
> > tatus.OPEN_READ, true);
> >              }
> >          } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
> >              if (param==null) {
> > @@ -508,14 +578,15 @@ public class Http11NioProcessor extends
> >              attach.setTimeout(timeout);
> >          } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
> >              if (asyncStateMachine.asyncDispatch()) {
> > -
> > ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
> > -                        SocketStatus.OPEN_READ, true);
> > +
> >
> ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketS
> > tatus.OPEN_READ, true);
> >              }
> > -        } else if (actionCode ==
> > ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
> > -            asyncStateMachine.asyncOperation();
> >          } else if (actionCode == ActionCode.SET_READ_LISTENER) {
> >              ReadListener listener = (ReadListener)param;
> >              request.setReadListener(listener);
> > +        } else if (actionCode == ActionCode.SET_WRITE_LISTENER) {
> > +            WriteListener listener = (WriteListener)param;
> > +            response.setWriteListener(listener);
> > +            outputBuffer.setBlocking(listener==null);
> >          } else if (actionCode == ActionCode.NB_READ_INTEREST) {
> >              if (socket==null ||
> > socket.getSocket().getAttachment(false)==null) {
> >                  return;
> > @@ -524,8 +595,27 @@ public class Http11NioProcessor extends
> >              if (rp.getStage() ==
> > org.apache.coyote.Constants.STAGE_SERVICE) {
> >                  NioEndpoint.KeyAttachment attach =
> > (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> >                  attach.interestOps(attach.interestOps() |
> > SelectionKey.OP_READ);
> > +            } else {
> > +                throw new IllegalStateException("Calling isReady
> > asynchronously is illegal.");
> >              }
> > -
> > +        } else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
> > +            if (socket==null ||
> > socket.getSocket().getAttachment(false)==null) {
> > +                return;
> > +        }
> > +            AtomicBoolean canWrite = (AtomicBoolean)param;
> > +            RequestInfo rp = request.getRequestProcessor();
> > +            if (rp.getStage() ==
> > org.apache.coyote.Constants.STAGE_SERVICE) {
> > +                if (outputBuffer.isWritable()) {
> > +                    canWrite.set(true);
> > +                } else {
> > +                    canWrite.set(false);
> > +                    wantOnWritePossible = true;
> > +    }
> > +            } else {
> > +                throw new IllegalStateException("Calling canWrite
> > asynchronously is illegal.");
> > +            }
> > +        } else if (actionCode ==
> > ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
> > +            asyncStateMachine.asyncOperation();
> >          }
> >      }
> >
> >
> > Modified:
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> >
> InternalAprOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> > (original)
> > +++
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -86,6 +86,11 @@ public class InternalAprOutputBuffer ext
> >      // ---------------------------------------------------------
> Public
> > Methods
> >
> >      @Override
> > +    public boolean supportsNonBlocking() {
> > +        return false;
> > +    }
> > +
> > +    @Override
> >      public void init(SocketWrapper<Long> socketWrapper,
> >              AbstractEndpoint endpoint) throws IOException {
> >
> >
> > Modified:
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> >
> InternalNioOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> > (original)
> > +++
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -21,6 +21,12 @@ import java.io.IOException;
> >  import java.nio.ByteBuffer;
> >  import java.nio.channels.SelectionKey;
> >  import java.nio.channels.Selector;
> > +import java.util.Collections;
> > +import java.util.Iterator;
> > +import java.util.LinkedList;
> > +import java.util.concurrent.LinkedBlockingDeque;
> > +import java.util.concurrent.atomic.AtomicBoolean;
> > +import java.util.concurrent.atomic.AtomicInteger;
> >
> >  import org.apache.coyote.OutputBuffer;
> >  import org.apache.coyote.Response;
> > @@ -76,9 +82,80 @@ public class InternalNioOutputBuffer ext
> >       */
> >      private NioSelectorPool pool;
> >
> > +    /**
> > +     * Flag used only for Comet requests/responses
> > +     */
> > +    protected volatile boolean blocking = true;
> > +
> > +    /**
> > +     * Track if the byte buffer is flipped
> > +     */
> > +    protected volatile boolean flipped = false;
> > +
> > +    /**
> > +     * For "non-blocking" writes use an external buffer
> > +     */
> > +    protected volatile LinkedBlockingDeque<ByteBufferHolder>
> > bufferedWrite = null;
> > +
> > +    /**
> > +     * The max size of the buffered write buffer
> > +     */
> > +    protected int bufferedWriteSize = 64*1024; //64k default write
> > buffer
> > +
> > +    /**
> > +     * Number of bytes last written
> > +     */
> > +    protected AtomicInteger lastWrite = new AtomicInteger(1);
> > +
> > +    protected class ByteBufferHolder {
> > +        private ByteBuffer buf;
> > +        private AtomicBoolean flipped;
> > +        public ByteBufferHolder(ByteBuffer buf, boolean flipped) {
> > +           this.buf = buf;
> > +           this.flipped = new AtomicBoolean(flipped);
> > +        }
> > +        public ByteBuffer getBuf() {
> > +            return buf;
> > +        }
> > +        public boolean isFlipped() {
> > +            return flipped.get();
> > +        }
> > +
> > +        public boolean flip() {
> > +            if (flipped.compareAndSet(false, true)) {
> > +                buf.flip();
> > +                return true;
> > +            } else {
> > +                return false;
> > +            }
> > +        }
> > +
> > +        public boolean hasData() {
> > +            if (flipped.get()) {
> > +                return buf.remaining()>0;
> > +            } else {
> > +                return buf.position()>0;
> > +            }
> > +        }
> > +
> > +        @Override
> > +        public String toString() {
> > +            StringBuilder builder = new
> > StringBuilder(super.toString());
> > +            builder.append("[flipped=");
> > +            builder.append(isFlipped()?"true, remaining=" : "false,
> > position=");
> > +            builder.append(isFlipped()? buf.remaining():
> > buf.position());
> > +            builder.append("]");
> > +            return builder.toString();
> > +        }
> > +
> > +    }
> >
> >      // ---------------------------------------------------------
> Public
> > Methods
> >
> > +    @Override
> > +    public boolean supportsNonBlocking() {
> > +        return true;
> > +    }
> >
> >      /**
> >       * Flush the response.
> > @@ -91,7 +168,7 @@ public class InternalNioOutputBuffer ext
> >
> >          super.flush();
> >          // Flush the current buffer
> > -        flushBuffer();
> > +        flushBuffer(isBlocking());
> >
> >      }
> >
> > @@ -107,6 +184,9 @@ public class InternalNioOutputBuffer ext
> >              socket.getBufHandler().getWriteBuffer().clear();
> >              socket = null;
> >          }
> > +        lastWrite.set(1);
> > +        setBlocking(true);
> > +        flipped = false;
> >      }
> >
> >
> > @@ -118,7 +198,7 @@ public class InternalNioOutputBuffer ext
> >      @Override
> >      public void endRequest() throws IOException {
> >          super.endRequest();
> > -        flushBuffer();
> > +        flushBuffer(true);
> >      }
> >
> >      // ------------------------------------------------ HTTP/1.1
> Output
> > Methods
> > @@ -146,8 +226,12 @@ public class InternalNioOutputBuffer ext
> >       * @throws IOException
> >       * TODO Fix non blocking write properly
> >       */
> > +    int total = 0;
> >      private synchronized int writeToSocket(ByteBuffer bytebuffer,
> > boolean block, boolean flip) throws IOException {
> > -        if ( flip ) bytebuffer.flip();
> > +        if ( flip ) {
> > +            bytebuffer.flip();
> > +            flipped = true;
> > +        }
> >
> >          int written = 0;
> >          NioEndpoint.KeyAttachment att =
> > (NioEndpoint.KeyAttachment)socket.getAttachment(false);
> > @@ -168,7 +252,14 @@ public class InternalNioOutputBuffer ext
> >          }finally {
> >              if ( selector != null ) pool.put(selector);
> >          }
> > -        if ( block ) bytebuffer.clear(); //only clear
> > +        if ( block || bytebuffer.remaining()==0) {
> > +            //blocking writes must empty the buffer
> > +            //and if remaining==0 then we did empty it
> > +            bytebuffer.clear();
> > +            flipped = false;
> > +        }
> > +        total+= written;
> > +        //System.out.println("Successful write("+written+ " /
> "+total);
> >          return written;
> >      }
> >
> > @@ -204,31 +295,48 @@ public class InternalNioOutputBuffer ext
> >
> >      }
> >
> > +
> >      private synchronized void addToBB(byte[] buf, int offset, int
> > length) throws IOException {
> > -        while (length > 0) {
> > -            int thisTime = length;
> > -            if (socket.getBufHandler().getWriteBuffer().position() ==
> > -
> socket.getBufHandler().getWriteBuffer().capacity()
> > -                    ||
> > socket.getBufHandler().getWriteBuffer().remaining()==0) {
> > -                flushBuffer();
> > -            }
> > -            if (thisTime >
> > socket.getBufHandler().getWriteBuffer().remaining()) {
> > -                thisTime =
> > socket.getBufHandler().getWriteBuffer().remaining();
> > -            }
> > -            socket.getBufHandler().getWriteBuffer().put(buf, offset,
> > thisTime);
> > +        //try to write to socket first
> > +        if (length==0) return;
> > +
> > +        boolean dataLeft = flushBuffer(isBlocking());
> > +
> > +        while (!dataLeft && length>0) {
> > +            int thisTime =
> > transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
> >              length = length - thisTime;
> >              offset = offset + thisTime;
> > +            writeToSocket(socket.getBufHandler().getWriteBuffer(),
> > isBlocking(), true);
> > +            dataLeft = flushBuffer(isBlocking());
> >          }
> > +
> >          NioEndpoint.KeyAttachment ka =
> > (NioEndpoint.KeyAttachment)socket.getAttachment(false);
> >          if ( ka!= null ) ka.access();//prevent timeouts for just
> doing
> > client writes
> > +
> > +        if (!isBlocking() && length>0) {
> > +            //we must buffer as long as it fits
> > +            //ByteBufferHolder tail = bufferedWrite.
> > +            addToBuffers(buf, offset, length);
> > +    }
> > +    }
> > +
> > +    private void addToBuffers(byte[] buf, int offset, int length) {
> > +        ByteBufferHolder holder = bufferedWrite.peekLast();
> > +        if (holder==null || holder.isFlipped() ||
> > holder.getBuf().remaining()<length) {
> > +            ByteBuffer buffer =
> > ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
> > +            holder = new ByteBufferHolder(buffer,false);
> > +            bufferedWrite.add(holder);
> > +        }
> > +        holder.getBuf().put(buf,offset,length);
> >      }
> >
> >
> >      /**
> >       * Callback to write data from the buffer.
> >       */
> > -    private void flushBuffer() throws IOException {
> > +    protected boolean flushBuffer(boolean block) throws IOException {
> >
> > +        int result = 0;
> >          //prevent timeout for async,
> >          SelectionKey key =
> > socket.getIOChannel().keyFor(socket.getPoller().getSelector());
> >          if (key != null) {
> > @@ -236,16 +344,61 @@ public class InternalNioOutputBuffer ext
> >              attach.access();
> >          }
> >
> > +        boolean dataLeft = hasMoreDataToFlush();
> > +
> >          //write to the socket, if there is anything to write
> > -        if (socket.getBufHandler().getWriteBuffer().position() > 0) {
> > -            socket.getBufHandler().getWriteBuffer().flip();
> > -
> writeToSocket(socket.getBufHandler().getWriteBuffer(),true,
> > false);
> > +        if ( dataLeft ) {
> > +            result =
> > writeToSocket(socket.getBufHandler().getWriteBuffer(),block,
> !flipped);
> >          }
> > +
> > +        dataLeft = hasMoreDataToFlush();
> > +
> > +        if (!dataLeft && bufferedWrite!=null) {
> > +            Iterator<ByteBufferHolder> bufIter =
> > bufferedWrite.iterator();
> > +            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
> > +                ByteBufferHolder buffer = bufIter.next();
> > +                buffer.flip();
> > +                while (!hasMoreDataToFlush() &&
> > buffer.getBuf().remaining()>0) {
> > +                    transfer(buffer.getBuf(),
> > socket.getBufHandler().getWriteBuffer());
> > +                    if (buffer.getBuf().remaining() == 0) {
> > +                        bufIter.remove();
> >      }
> > +                    result =
> > writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
> > +                    //here we must break if we didn't finish the
> write
> >
> > +                }
> > +            }
> > +        }
> > +
> > +        dataLeft = hasMoreDataToFlush();
> > +
> > +        return dataLeft;
> > +    }
> > +
> > +    private boolean hasMoreDataToFlush() {
> > +        return (flipped &&
> > socket.getBufHandler().getWriteBuffer().remaining()>0) ||
> > +        (!flipped &&
> socket.getBufHandler().getWriteBuffer().position()
> > > 0);
> > +    }
> > +
> > +    private int transfer(byte[] from, int offset, int length,
> > ByteBuffer to) {
> > +        int max = Math.min(length, to.remaining());
> > +        ByteBuffer tmp = ByteBuffer.wrap(from, offset, max);
> > +        tmp.limit (tmp.position() + max);
> > +        to.put (tmp);
> > +        return max;
> > +    }
> > +
> > +    private int transfer(ByteBuffer from, ByteBuffer to) {
> > +        int max = Math.min(from.remaining(), to.remaining());
> > +        ByteBuffer tmp = from.duplicate ();
> > +        tmp.limit (tmp.position() + max);
> > +        to.put (tmp);
> > +        from.position(from.position() + max);
> > +        return max;
> > +    }
> >
> > -    // ----------------------------------- OutputStreamOutputBuffer
> > Inner Class
> >
> > +    // ----------------------------------- OutputStreamOutputBuffer
> > Inner Class
> >
> >      /**
> >       * This class is an output buffer which will write data to an
> > output
> > @@ -275,4 +428,44 @@ public class InternalNioOutputBuffer ext
> >              return byteCount;
> >          }
> >      }
> > +
> > +    //----------------------------------------non blocking writes ---
> --
> > ------------
> > +    public void setBlocking(boolean blocking) {
> > +        this.blocking = blocking;
> > +        if (blocking)
> > +            bufferedWrite = null;
> > +        else
> > +            bufferedWrite = new
> > LinkedBlockingDeque<ByteBufferHolder>();
> > +}
> > +
> > +    public void setBufferedWriteSize(int bufferedWriteSize) {
> > +        this.bufferedWriteSize = bufferedWriteSize;
> > +    }
> > +
> > +    public boolean isBlocking() {
> > +        return blocking;
> > +    }
> > +
> > +    private boolean hasBufferedData() {
> > +        boolean result = false;
> > +        if (bufferedWrite!=null) {
> > +            Iterator<ByteBufferHolder> iter =
> bufferedWrite.iterator();
> > +            while (!result && iter.hasNext()) {
> > +                result = iter.next().hasData();
> > +            }
> > +        }
> > +        return result;
> > +    }
> > +
> > +    public boolean hasDataToWrite() {
> > +        return hasMoreDataToFlush() || hasBufferedData();
> > +    }
> > +
> > +    public int getBufferedWriteSize() {
> > +        return bufferedWriteSize;
> > +    }
> > +
> > +    public boolean isWritable() {
> > +        return !hasDataToWrite();
> > +    }
> >  }
> >
> > Modified:
> > tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> > InternalOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> > (original)
> > +++
> tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -96,6 +96,11 @@ public class InternalOutputBuffer extend
> >      // ---------------------------------------------------------
> Public
> > Methods
> >
> >      @Override
> > +    public boolean supportsNonBlocking() {
> > +        return false;
> > +    }
> > +
> > +    @Override
> >      public void init(SocketWrapper<Socket> socketWrapper,
> >              AbstractEndpoint endpoint) throws IOException {
> >
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/Sp
> > dyProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Fri
> Jul
> > 6 06:53:52 2012
> > @@ -386,6 +386,8 @@ public class SpdyProcessor extends Abstr
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncDispatching());
> >          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
> >              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> > +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION)
> {
> > +            ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncOperation());
> >          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncTimingOut());
> >          } else {
> >
> > Modified:
> tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/ne
> > t/NioEndpoint.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Fri
> > Jul  6 06:53:52 2012
> > @@ -716,7 +716,16 @@ public class NioEndpoint extends Abstrac
> >          return true;
> >      }
> >
> > -    public boolean processSocket(NioChannel socket, SocketStatus
> > status, boolean dispatch) {
> > +    public boolean dispatchForEvent(NioChannel socket, SocketStatus
> > status, boolean dispatch) {
> > +        if (!dispatch) {
> > +            processSocket(socket,status,dispatch);
> > +        } else {
> > +            socket.getPoller().add(socket, OP_CALLBACK);
> > +        }
> > +        return true;
> > +    }
> > +
> > +    protected boolean processSocket(NioChannel socket, SocketStatus
> > status, boolean dispatch) {
> >          try {
> >              KeyAttachment attachment =
> > (KeyAttachment)socket.getAttachment(false);
> >              if (attachment == null) {
> > @@ -900,7 +909,7 @@ public class NioEndpoint extends Abstrac
> >                          final KeyAttachment att = (KeyAttachment)
> > key.attachment();
> >                          if ( att!=null ) {
> >                              //handle callback flag
> > -                            if (att.getComet() && (interestOps &
> > OP_CALLBACK) == OP_CALLBACK ) {
> > +                            if ((interestOps & OP_CALLBACK) ==
> > OP_CALLBACK ) {
> >                                  att.setCometNotify(true);
> >                              } else {
> >                                  att.setCometNotify(false);
> > @@ -910,7 +919,8 @@ public class NioEndpoint extends Abstrac
> >                              //we are registering the key to start
> with,
> > reset the fairness counter.
> >                              int ops = key.interestOps() |
> interestOps;
> >                              att.interestOps(ops);
> > -                            key.interestOps(ops);
> > +                            if (att.getCometNotify())
> > key.interestOps(0);
> > +                            else key.interestOps(ops);
> >                          } else {
> >                              cancel = true;
> >                          }
> > @@ -979,10 +989,6 @@ public class NioEndpoint extends Abstrac
> >          public void cometInterest(NioChannel socket) {
> >              KeyAttachment att =
> > (KeyAttachment)socket.getAttachment(false);
> >              add(socket,att.getCometOps());
> > -            if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
> > -                nextExpiration = 0; //force the check for faster
> > callback
> > -                selector.wakeup();
> > -            }
> >          }
> >
> >          /**
> > @@ -1001,6 +1007,9 @@ public class NioEndpoint extends Abstrac
> >              PollerEvent r = eventCache.poll();
> >              if ( r==null) r = new
> PollerEvent(socket,null,interestOps);
> >              else r.reset(socket,null,interestOps);
> > +            if ( (interestOps&OP_CALLBACK) == OP_CALLBACK ) {
> > +                nextExpiration = 0; //force the check for faster
> > callback
> > +            }
> >              addEvent(r);
> >              if (close) {
> >                  processSocket(socket, SocketStatus.STOP, false);
> > @@ -1256,7 +1265,7 @@ public class NioEndpoint extends Abstrac
> >
> >                                  boolean readAndWrite =
> sk.isReadable()
> > && sk.isWritable();
> >                                  reg(sk, attachment, 0);
> > -                                if (readAndWrite) {
> > +                                if (attachment.isAsync() &&
> > readAndWrite) {
> >                                      //remember the that we want to
> know
> > about write too
> >
> > attachment.interestOps(SelectionKey.OP_WRITE);
> >                                  }
> > @@ -1406,7 +1415,7 @@ public class NioEndpoint extends Abstrac
> >              // - the selector simply timed out (suggests there isn't
> > much load)
> >              // - the nextExpiration time has passed
> >              // - the server socket is being closed
> > -            if ((keyCount > 0 || hasEvents) && (now < nextExpiration)
> > && !close) {
> > +            if (nextExpiration > 0 && (keyCount > 0 || hasEvents) &&
> > (now < nextExpiration) && !close) {
> >                  return;
> >              }
> >              //timeout
> > @@ -1421,10 +1430,11 @@ public class NioEndpoint extends Abstrac
> >                          cancelledKey(key, SocketStatus.ERROR); //we
> > don't support any keys without attachments
> >                      } else if ( ka.getError() ) {
> >                          cancelledKey(key, SocketStatus.ERROR);//TODO
> > this is not yet being used
> > -                    } else if (ka.getComet() && ka.getCometNotify() )
> {
> > +                    } else if (ka.getCometNotify() ) {
> >                          ka.setCometNotify(false);
> > -                        reg(key,ka,0);//avoid multiple calls, this
> gets
> > reregistered after invocation
> > -                        //if (!processSocket(ka.getChannel(),
> > SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(),
> > SocketStatus.DISCONNECT);
> > +                        int ops = ka.interestOps() & ~OP_CALLBACK;;
> > +                        reg(key,ka,0);//avoid multiple calls, this
> gets
> > re-registered after invocation
> > +                        ka.interestOps(ops);
> >                          if (!processSocket(ka.getChannel(),
> > SocketStatus.OPEN_READ, true)) processSocket(ka.getChannel(),
> > SocketStatus.DISCONNECT, true);
> >                      } else if
> ((ka.interestOps()&SelectionKey.OP_READ)
> > == SelectionKey.OP_READ ||
> >
> (ka.interestOps()&SelectionKey.OP_WRITE)
> > == SelectionKey.OP_WRITE) {
> >
> > Modified:
> > tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/ne
> > t/SecureNioChannel.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> > Fri Jul  6 06:53:52 2012
> > @@ -478,7 +478,7 @@ public class SecureNioChannel extends Ni
> >              int written = sc.write(src);
> >              return written;
> >          } else {
> > -            //make sure we can handle expand, and that we only use on
> > buffer
> > +            //make sure we can handle expand, and that we only use
> one
> > buffer
> >              if ( (!this.isSendFile()) && (src !=
> > bufHandler.getWriteBuffer()) ) throw new IllegalArgumentException("You
> > can only write using the application write buffer provided by the
> > handler.");
> >              //are we closing or closed?
> >              if ( closing || closed) throw new IOException("Channel is
> > in closing state.");
> >
> > Modified:
> >
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> > a
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonbl
> >
> ocking/TestNonBlockingAPI.java?rev=1358055&r1=1358054&r2=1358055&view=di
> > ff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> > a (original)
> > +++
> >
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> > a Fri Jul  6 06:53:52 2012
> > @@ -17,10 +17,10 @@
> >  package org.apache.catalina.nonblocking;
> >
> >  import java.io.IOException;
> > +import java.util.Arrays;
> >  import java.util.HashMap;
> >  import java.util.List;
> >  import java.util.Map;
> > -import java.util.Random;
> >
> >  import javax.servlet.AsyncContext;
> >  import javax.servlet.AsyncEvent;
> > @@ -28,14 +28,12 @@ import javax.servlet.AsyncListener;
> >  import javax.servlet.ReadListener;
> >  import javax.servlet.ServletException;
> >  import javax.servlet.ServletInputStream;
> > +import javax.servlet.ServletOutputStream;
> >  import javax.servlet.WriteListener;
> >  import javax.servlet.annotation.WebServlet;
> >  import javax.servlet.http.HttpServletRequest;
> >  import javax.servlet.http.HttpServletResponse;
> >
> > -import org.junit.Assert;
> > -import org.junit.Test;
> > -
> >  import org.apache.catalina.Wrapper;
> >  import org.apache.catalina.core.StandardContext;
> >  import org.apache.catalina.startup.BytesStreamer;
> > @@ -44,9 +42,14 @@ import org.apache.catalina.startup.Tomca
> >  import org.apache.catalina.startup.TomcatBaseTest;
> >  import org.apache.coyote.http11.Http11NioProtocol;
> >  import org.apache.tomcat.util.buf.ByteChunk;
> > +import org.apache.tomcat.util.buf.ByteChunk.ByteOutputChannel;
> > +import org.junit.Assert;
> > +import org.junit.Test;
> >
> >  public class TestNonBlockingAPI extends TomcatBaseTest {
> >
> > +    public static final long bytesToDownload = 1024 * 1024 * 5;
> > +
> >      @Override
> >      protected String getProtocol() {
> >          return Http11NioProtocol.class.getName();
> > @@ -58,31 +61,89 @@ public class TestNonBlockingAPI extends
> >      }
> >
> >      @Test
> > -    public void testOne() throws Exception {
> > +    public void testNonBlockingRead() throws Exception {
> >          // Configure a context with digest auth and a single
> protected
> > resource
> >          Tomcat tomcat = getTomcatInstance();
> >          // Must have a real docBase - just use temp
> > -        StandardContext ctx = (StandardContext)
> > -            tomcat.addContext("",
> > System.getProperty("java.io.tmpdir"));
> > +        StandardContext ctx = (StandardContext) tomcat.addContext("",
> > System.getProperty("java.io.tmpdir"));
> >
> > -        NBTesterServlet servlet = new NBTesterServlet();
> > -        String servletName = NBTesterServlet.class.getName();
> > +        NBReadServlet servlet = new NBReadServlet();
> > +        String servletName = NBReadServlet.class.getName();
> >          Wrapper servletWrapper = tomcat.addServlet(ctx, servletName,
> > servlet);
> >          ctx.addServletMapping("/", servletName);
> >
> >          tomcat.start();
> >
> > -        Map<String,List<String>> resHeaders= new HashMap<String,
> > List<String>>();
> > -        int rc = postUrl(true, new DataWriter(),"http://localhost:" +
> > getPort() + "/", new ByteChunk(),resHeaders, null);
> > +        Map<String, List<String>> resHeaders = new HashMap<String,
> > List<String>>();
> > +        int rc = postUrl(true, new DataWriter(500),
> "http://localhost:"
> > + getPort() + "/", new ByteChunk(),
> > +                resHeaders, null);
> >          Assert.assertEquals(HttpServletResponse.SC_OK, rc);
> >      }
> >
> > +    @Test
> > +    public void testNonBlockingWrite() throws Exception {
> > +        String bind = "localhost";
> > +        // Configure a context with digest auth and a single
> protected
> > resource
> > +        Tomcat tomcat = getTomcatInstance();
> > +        // Must have a real docBase - just use temp
> > +        StandardContext ctx = (StandardContext) tomcat.addContext("",
> > System.getProperty("java.io.tmpdir"));
> > +
> > +        NBWriteServlet servlet = new NBWriteServlet();
> > +        String servletName = NBWriteServlet.class.getName();
> > +        Wrapper servletWrapper = tomcat.addServlet(ctx, servletName,
> > servlet);
> > +        ctx.addServletMapping("/", servletName);
> > +        tomcat.getConnector().setProperty("socket.txBufSize",
> "1024");
> > +        tomcat.getConnector().setProperty("address", bind);
> > +
> > System.out.println(tomcat.getConnector().getProperty("address"));
> > +        tomcat.start();
> > +
> > +        Map<String, List<String>> resHeaders = new HashMap<String,
> > List<String>>();
> > +        ByteChunk slowReader = new ByteChunk();
> > +        slowReader.setLimit(1); // FIXME BUFFER IS BROKEN, 0 doesn't
> > work
> > +        slowReader.setByteOutputChannel(new ByteOutputChannel() {
> > +            long counter = 0;
> > +            long delta = 0;
> > +
> > +            @Override
> > +            public void realWriteBytes(byte[] cbuf, int off, int len)
> > throws IOException {
> > +                try {
> > +                    if (len == 0)
> > +                        return;
> > +                    counter += len;
> > +                    delta += len;
> > +                    if (counter > bytesToDownload) {
> > +                        System.out.println("ERROR Downloaded more
> than
> > expected ERROR");
> > +                    } else if (counter == bytesToDownload) {
> > +                        System.out.println("Download complete(" +
> > bytesToDownload + " bytes)");
> > +                        // } else if (counter > (1966086)) {
> > +                        // System.out.println("Download almost
> > complete, missing bytes ("+counter+")");
> > +                    } else if (delta > (bytesToDownload / 16)) {
> > +                        System.out.println("Read " + counter + "
> > bytes.");
> > +                        delta = 0;
> > +                        Thread.currentThread().sleep(500);
> > +                    }
> > +                } catch (Exception x) {
> > +                    throw new IOException(x);
> > +                }
> > +            }
> > +        });
> > +        int rc = postUrl(true, new DataWriter(0), "http://" + bind +
> > ":" + getPort() + "/", slowReader, resHeaders,
> > +                null);
> > +        slowReader.flushBuffer();
> > +        Assert.assertEquals(HttpServletResponse.SC_OK, rc);
> > +    }
> >
> >      public static class DataWriter implements BytesStreamer {
> >          final int max = 5;
> >          int count = 0;
> > +        long delay = 0;
> >          byte[] b = "WANTMORE".getBytes();
> >          byte[] f = "FINISHED".getBytes();
> > +
> > +        public DataWriter(long delay) {
> > +            this.delay = delay;
> > +        }
> > +
> >          @Override
> >          public int getLength() {
> >              return b.length * max;
> > @@ -90,7 +151,7 @@ public class TestNonBlockingAPI extends
> >
> >          @Override
> >          public int available() {
> > -            if (count<max) {
> > +            if (count < max) {
> >                  return b.length;
> >              } else {
> >                  return 0;
> > @@ -100,9 +161,14 @@ public class TestNonBlockingAPI extends
> >          @Override
> >          public byte[] next() {
> >              if (count < max) {
> > -                if (count>0) try {Thread.sleep(6000);}catch(Exception
> > x){}
> > +                if (count > 0)
> > +                    try {
> > +                        if (delay > 0)
> > +                            Thread.sleep(delay);
> > +                    } catch (Exception x) {
> > +                    }
> >                  count++;
> > -                if (count<max)
> > +                if (count < max)
> >                      return b;
> >                  else
> >                      return f;
> > @@ -113,12 +179,12 @@ public class TestNonBlockingAPI extends
> >
> >      }
> >
> > -    @WebServlet(asyncSupported=true)
> > -    public static class NBTesterServlet extends TesterServlet {
> > +    @WebServlet(asyncSupported = true)
> > +    public static class NBReadServlet extends TesterServlet {
> >
> >          @Override
> >          protected void service(HttpServletRequest req,
> > HttpServletResponse resp) throws ServletException, IOException {
> > -            //step 1 - start async
> > +            // step 1 - start async
> >              AsyncContext actx = req.startAsync();
> >              actx.setTimeout(Long.MAX_VALUE);
> >              actx.addListener(new AsyncListener() {
> > @@ -147,7 +213,7 @@ public class TestNonBlockingAPI extends
> >
> >                  }
> >              });
> > -            //step 2 - notify on read
> > +            // step 2 - notify on read
> >              ServletInputStream in = req.getInputStream();
> >              ReadListener rlist = new TestReadListener(actx);
> >              in.setReadListener(rlist);
> > @@ -155,17 +221,12 @@ public class TestNonBlockingAPI extends
> >              while (in.isReady()) {
> >                  rlist.onDataAvailable();
> >              }
> > -            //step 3 - notify that we wish to read
> > -            //ServletOutputStream out = resp.getOutputStream();
> > -            //out.setWriteListener(new TestWriteListener(actx));
> > +            // step 3 - notify that we wish to read
> > +            // ServletOutputStream out = resp.getOutputStream();
> > +            // out.setWriteListener(new TestWriteListener(actx));
> >
> >          }
> >
> > -        @Override
> > -        protected void doPost(HttpServletRequest req,
> > HttpServletResponse resp) throws ServletException, IOException {
> > -            doGet(req,resp);
> > -        }
> > -
> >          private class TestReadListener implements ReadListener {
> >              AsyncContext ctx;
> >
> > @@ -177,9 +238,9 @@ public class TestNonBlockingAPI extends
> >              public void onDataAvailable() {
> >                  try {
> >                      ServletInputStream in =
> > ctx.getRequest().getInputStream();
> > -                    int avail=0;
> > +                    int avail = 0;
> >                      String s = "";
> > -                    while ((avail=in.dataAvailable()) > 0) {
> > +                    while ((avail = in.dataAvailable()) > 0) {
> >                          byte[] b = new byte[avail];
> >                          in.read(b);
> >                          s += new String(b);
> > @@ -191,7 +252,7 @@ public class TestNonBlockingAPI extends
> >                      } else {
> >                          in.isReady();
> >                      }
> > -                }catch (Exception x) {
> > +                } catch (Exception x) {
> >                      x.printStackTrace();
> >                      ctx.complete();
> >                  }
> > @@ -212,9 +273,62 @@ public class TestNonBlockingAPI extends
> >              }
> >          }
> >
> > -        private class TestWriteListener implements WriteListener {
> > +    }
> > +
> > +    @WebServlet(asyncSupported = true)
> > +    public static class NBWriteServlet extends TesterServlet {
> >
> > +        @Override
> > +        protected void service(HttpServletRequest req,
> > HttpServletResponse resp) throws ServletException, IOException {
> > +            // step 1 - start async
> > +            AsyncContext actx = req.startAsync();
> > +            actx.setTimeout(Long.MAX_VALUE);
> > +            actx.addListener(new AsyncListener() {
> > +
> > +                @Override
> > +                public void onTimeout(AsyncEvent event) throws
> > IOException {
> > +                    System.out.println("onTimeout");
> > +
> > +                }
> > +
> > +                @Override
> > +                public void onStartAsync(AsyncEvent event) throws
> > IOException {
> > +                    System.out.println("onStartAsync");
> > +
> > +                }
> > +
> > +                @Override
> > +                public void onError(AsyncEvent event) throws
> > IOException {
> > +                    System.out.println("onError");
> > +
> > +                }
> > +
> > +                @Override
> > +                public void onComplete(AsyncEvent event) throws
> > IOException {
> > +                    System.out.println("onComplete");
> > +
> > +                }
> > +            });
> > +            // step 2 - notify on read
> > +            // ServletInputStream in = req.getInputStream();
> > +            // ReadListener rlist = new TestReadListener(actx);
> > +            // in.setReadListener(rlist);
> > +            //
> > +            // while (in.isReady()) {
> > +            // rlist.onDataAvailable();
> > +            // }
> > +            // step 3 - notify that we wish to read
> > +            ServletOutputStream out = resp.getOutputStream();
> > +            resp.setBufferSize(200 * 1024);
> > +            TestWriteListener listener = new TestWriteListener(actx);
> > +            out.setWriteListener(listener);
> > +            listener.onWritePossible();
> > +        }
> > +
> > +        private class TestWriteListener implements WriteListener {
> > +            long chunk = 1024 * 1024;
> >              AsyncContext ctx;
> > +            long bytesToDownload =
> TestNonBlockingAPI.bytesToDownload;
> >
> >              public TestWriteListener(AsyncContext ctx) {
> >                  this.ctx = ctx;
> > @@ -222,36 +336,40 @@ public class TestNonBlockingAPI extends
> >
> >              @Override
> >              public void onWritePossible() {
> > -                // TODO Auto-generated method stub
> > +                System.out.println("onWritePossible");
> > +                try {
> > +                    long left = Math.max(bytesToDownload, 0);
> > +                    long start = System.currentTimeMillis();
> > +                    long end = System.currentTimeMillis();
> > +                    long before = left;
> > +                    while (left > 0 &&
> > ctx.getResponse().getOutputStream().canWrite()) {
> > +                        byte[] b = new byte[(int) Math.min(chunk,
> > bytesToDownload)];
> > +                        Arrays.fill(b, (byte) 'X');
> > +                        ctx.getResponse().getOutputStream().write(b);
> > +                        bytesToDownload -= b.length;
> > +                        left = Math.max(bytesToDownload, 0);
> > +                    }
> > +                    System.out
> > +                            .println("Write took:" + (end - start) +
> "
> > ms. Bytes before=" + before + " after=" + left);
> > +                    // only call complete if we have emptied the
> buffer
> > +                    if (left == 0 &&
> > ctx.getResponse().getOutputStream().canWrite()) {
> > +                        // it is illegal to call complete
> > +                        // if there is a write in progress
> > +                        ctx.complete();
> > +                    }
> > +                } catch (Exception x) {
> > +                    x.printStackTrace();
> > +                }
> >
> >              }
> >
> >              @Override
> >              public void onError(Throwable throwable) {
> > -                // TODO Auto-generated method stub
> > -
> > +                System.out.println("onError");
> > +                throwable.printStackTrace();
> >              }
> >
> >          }
> >
> > -
> > -
> > -    }
> > -
> > -
> > -    private static class StockQuotes {
> > -        public StockQuotes() {
> > -            super();
> > -        }
> > -        Random r = new Random(System.currentTimeMillis());
> > -
> > -        public String getNextQuote() {
> > -            return String.format("VMW: $%10.0f", r.nextDouble());
> > -        }
> > -
> > -
> > -
> >      }
> > -
> > -
> >  }
> >
> > Modified:
> > tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/start
> > up/TomcatBaseTest.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> > (original)
> > +++ tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> > Fri Jul  6 06:53:52 2012
> > @@ -49,6 +49,7 @@ import org.apache.catalina.core.AprLifec
> >  import org.apache.catalina.core.StandardServer;
> >  import org.apache.catalina.session.StandardManager;
> >  import org.apache.catalina.valves.AccessLogValve;
> > +import org.apache.coyote.http11.Http11NioProtocol;
> >  import org.apache.tomcat.util.buf.ByteChunk;
> >
> >  /**
> > @@ -141,9 +142,9 @@ public abstract class TomcatBaseTest ext
> >          // Has a protocol been specified
> >          String protocol = System.getProperty("tomcat.test.protocol");
> >
> > -        // Use BIO by default
> > +        // Use NIO by default in Tomcat 8
> >          if (protocol == null) {
> > -            protocol = "org.apache.coyote.http11.Http11Protocol";
> > +            protocol = Http11NioProtocol.class.getName();
> >          }
> >
> >          return protocol;
> >
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> > For additional commands, e-mail: dev-help@tomcat.apache.org
> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message