tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r1001698 [1/2] - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/catalina/security/ java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/tomc...
Date Mon, 27 Sep 2010 12:13:33 GMT
Author: markt
Date: Mon Sep 27 12:13:32 2010
New Revision: 1001698

URL: http://svn.apache.org/viewvc?rev=1001698&view=rev
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=49884

This required a major re-factoring of the async implementation. In summary:
- Moved state management to the Coyote Processor
- Added a SocketWrapper to the APR socket
- Added syncs to ensure only one async state change at a time
- Added syncs to ensure only one thread changing a socket's state at a time

A number of new bugs were also uncovered and fixed by this re-factoring:
- delay processing complete() and dispatch() until request where startAsync() is called finished processing
- onAsyncStart listener event

Currently the test case for bug 49884 passes with the security manager enabled using "ab -n 5000 -c 150 -k ..." (it broke with "ab -n 50 -c 10 ..." previously) 

The unit tests pass for all three HTTP connectors.

The AJP connectors have only been modified to ensure the code compiles.

The following work remains:
- Testing all connectors (HTTP and AJP) with TCK + security manager and fixing whatever is broken
- Further clean-up
- There is further scope for reducing code duplication between the connectors / aligning the code so it is easier to maintain.

Modified:
    tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
    tomcat/trunk/java/org/apache/catalina/connector/Request.java
    tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
    tomcat/trunk/java/org/apache/catalina/security/SecurityClassLoad.java
    tomcat/trunk/java/org/apache/coyote/ActionCode.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/Constants.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11Protocol.java
    tomcat/trunk/java/org/apache/coyote/http11/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
    tomcat/trunk/java/org/apache/tomcat/util/net/res/LocalStrings.properties
    tomcat/trunk/test/org/apache/catalina/core/TestAsyncContextImpl.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java Mon Sep 27 12:13:32 2010
@@ -260,55 +260,25 @@ public class CoyoteAdapter implements Ad
                     "Dispatch may only happen on an existing request.");
         }
         boolean comet = false;
-        boolean async = false;
         boolean success = true;
-        
+        AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
         try {
             if (status==SocketStatus.TIMEOUT) {
-                AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
-                //TODO SERVLET3 - async
-                //configure settings for timed out
-                asyncConImpl.setTimeoutState();
-            }
-            if (status==SocketStatus.ERROR || status==SocketStatus.DISCONNECT) {
-                AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
-                //TODO SERVLET3 - async
-                //configure settings for timed out
-                asyncConImpl.setErrorState(new IOException("Socket error."));
-            }
-            while (success) {
-                AsyncContextImpl impl = (AsyncContextImpl)request.getAsyncContext();
-                    // Calling the container
-                if (impl.getState()==AsyncContextImpl.AsyncState.DISPATCHED) {
-                    // Calling the container
-                    try {
-                        impl.complete();
-                        connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
-                    } finally {
-                        success = false;
-                    }
-                } else if (impl.getState()==AsyncContextImpl.AsyncState.STARTED){
-                    //TODO SERVLET3 - async
-                    res.action(ActionCode.ASYNC_START, request.getAsyncContext());
-                    async = true;
-                    break;
-                } else if (impl.getState()==AsyncContextImpl.AsyncState.NOT_STARTED){
-                    //TODO SERVLET3 - async
-                    async = false;
-                    break;
-                } else if (impl.getState()==AsyncContextImpl.AsyncState.ERROR_DISPATCHING) {
-                    async = false;
-                    success = false;
-                    connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
-                } else {
-                    try {
-                        connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
-                    } catch (RuntimeException x) {
-                        impl.setErrorState(x);
-                    }
+                success = true;
+                if (!asyncConImpl.timeout()) {
+                    asyncConImpl.setErrorState(null);
                 }
             }
-            
+            if (request.isAsyncDispatching()) {
+                success = true;
+                connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
+                Throwable t = (Throwable) request.getAttribute(
+                        Globals.EXCEPTION_ATTR);
+                if (t != null) {
+                    asyncConImpl.setErrorState(t);
+                }
+            }
+
             if (request.isComet()) {
                 if (!response.isClosed() && !response.isError()) {
                     if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
@@ -327,7 +297,7 @@ public class CoyoteAdapter implements Ad
                     request.setFilterChain(null);
                 }
             }
-            if (!async && !comet) {
+            if (!request.isAsync() && !comet) {
                 response.finishResponse();
                 req.action(ActionCode.POST_REQUEST , null);
             }
@@ -341,7 +311,7 @@ public class CoyoteAdapter implements Ad
         } finally {
             req.getRequestProcessor().setWorkerThreadName(null);
             // Recycle the wrapper request and response
-            if (!success || (!comet && !async)) {
+            if (!success || (!comet && !request.isAsync())) {
                 request.recycle();
                 response.recycle();
             } else {
@@ -426,15 +396,8 @@ public class CoyoteAdapter implements Ad
 
             }
             AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
-            if (asyncConImpl!=null && asyncConImpl.getState()==AsyncContextImpl.AsyncState.STARTED) {
-                res.action(ActionCode.ASYNC_START, request.getAsyncContext());
+            if (asyncConImpl != null) {
                 async = true;
-            } else if (request.isAsyncDispatching()) {
-                asyncDispatch(req, res, SocketStatus.OPEN);
-                if (request.isAsyncStarted()) {
-                    async = true;
-                    res.action(ActionCode.ASYNC_START, request.getAsyncContext());
-                }
             } else if (!comet) {
                 response.finishResponse();
                 req.action(ActionCode.POST_REQUEST , null);

Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Request.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Request.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Mon Sep 27 12:13:32 2010
@@ -37,6 +37,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.TimeZone;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.security.auth.Subject;
 import javax.servlet.AsyncContext;
@@ -1613,12 +1614,20 @@ public class Request
         if (asyncContext == null) {
             return false;
         }
-        
-        return (asyncContext.getState()==AsyncContextImpl.AsyncState.DISPATCHING ||
-                asyncContext.getState()==AsyncContextImpl.AsyncState.TIMING_OUT  ||
-                asyncContext.getState()==AsyncContextImpl.AsyncState.STARTED     ||
-                asyncContext.getState()==AsyncContextImpl.AsyncState.ERROR_DISPATCHING ||
-                asyncContext.getState()==AsyncContextImpl.AsyncState.COMPLETING);
+
+        AtomicBoolean result = new AtomicBoolean(false);
+        coyoteRequest.action(ActionCode.ASYNC_IS_DISPATCHING, result);
+        return result.get();
+    }
+
+    public boolean isAsync() {
+        if (asyncContext == null) {
+            return false;
+        }
+
+        AtomicBoolean result = new AtomicBoolean(false);
+        coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC, result);
+        return result.get();
     }
 
     @Override

Modified: tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java (original)
+++ tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java Mon Sep 27 12:13:32 2010
@@ -22,7 +22,6 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.servlet.AsyncContext;
 import javax.servlet.AsyncEvent;
@@ -50,11 +49,6 @@ import org.apache.juli.logging.LogFactor
  */
 public class AsyncContextImpl implements AsyncContext {
     
-    public static enum AsyncState {
-        NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT,
-        TIMING_OUT_NEED_COMPLETE, ERROR_DISPATCHING
-    }
-    
     private static final Log log = LogFactory.getLog(AsyncContextImpl.class);
     
     private ServletRequest servletRequest = null;
@@ -63,7 +57,6 @@ public class AsyncContextImpl implements
     private boolean hasOriginalRequestAndResponse = true;
     private volatile Runnable dispatch = null;
     private Context context = null;
-    private AtomicReference<AsyncState> state = new AtomicReference<AsyncState>(AsyncState.NOT_STARTED);
     private long timeout = -1;
     private AsyncEvent event = null;
     
@@ -81,23 +74,46 @@ public class AsyncContextImpl implements
         if (log.isDebugEnabled()) {
             logDebug("complete   ");
         }
-        if (state.get()==AsyncState.COMPLETING) {
-            //do nothing
-        } else if (state.compareAndSet(AsyncState.DISPATCHED,
-                           AsyncState.COMPLETING) ||
-                   state.compareAndSet(AsyncState.STARTED,
-                           AsyncState.COMPLETING) ||
-                   state.compareAndSet(AsyncState.TIMING_OUT_NEED_COMPLETE,
-                           AsyncState.COMPLETING)) {
-            AtomicBoolean dispatched = new AtomicBoolean(false);
-            request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE,
-                    dispatched);
-            if (!dispatched.get()) doInternalComplete(false);
-        } else {
-            throw new IllegalStateException(
-                    "Complete not allowed. Invalid state:"+state.get());
+        request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
+    }
+
+    public void fireOnComplete() {
+        List<AsyncListenerWrapper> listenersCopy =
+            new ArrayList<AsyncListenerWrapper>();
+        listenersCopy.addAll(listeners);
+        for (AsyncListenerWrapper listener : listenersCopy) {
+            try {
+                listener.fireOnComplete(event);
+            } catch (IOException ioe) {
+                log.warn("onComplete() failed for listener of type [" +
+                        listener.getClass().getName() + "]", ioe);
+            }
         }
-       
+    }
+    
+    public boolean timeout() throws IOException {
+        AtomicBoolean result = new AtomicBoolean();
+        request.getCoyoteRequest().action(ActionCode.ASYNC_TIMEOUT, result);
+        
+        if (result.get()) {
+            boolean listenerInvoked = false;
+            List<AsyncListenerWrapper> listenersCopy =
+                new ArrayList<AsyncListenerWrapper>();
+            listenersCopy.addAll(listeners);
+            for (AsyncListenerWrapper listener : listenersCopy) {
+                listener.fireOnTimeout(event);
+                listenerInvoked = true;
+            }
+            if (listenerInvoked) {
+                request.getCoyoteRequest().action(
+                        ActionCode.ASYNC_IS_TIMINGOUT, result);
+                return !result.get();
+            } else {
+                // No listeners, container calls complete
+                complete();
+            }
+        }
+        return true;
     }
 
     @Override
@@ -119,55 +135,37 @@ public class AsyncContextImpl implements
         if (log.isDebugEnabled()) {
             logDebug("dispatch   ");
         }
-
-        if (state.compareAndSet(AsyncState.STARTED, AsyncState.DISPATCHING) ||
-            state.compareAndSet(AsyncState.DISPATCHED, AsyncState.DISPATCHING)) {
-
-            if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
-                request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()+"?"+request.getQueryString());
-                request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
-                request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
-                request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
-            }
-            final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path);
-            final HttpServletRequest servletRequest = (HttpServletRequest)getRequest();
-            final HttpServletResponse servletResponse = (HttpServletResponse)getResponse();
-            Runnable run = new Runnable() {
-                @Override
-                public void run() {
-                    DispatcherType type = (DispatcherType)request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
-                    try {
-                        //piggy back on the request dispatcher to ensure that filters etc get called.
-                        //TODO SERVLET3 - async should this be include/forward or a new dispatch type
-                        //javadoc suggests include with the type of DispatcherType.ASYNC
-                        request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, DispatcherType.ASYNC);
-                        requestDispatcher.include(servletRequest, servletResponse);
-                    }catch (Exception x) {
-                        //log.error("Async.dispatch",x);
-                        throw new RuntimeException(x);
-                    }finally {
-                        request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, type);
-                    }
-                }
-            };
-            this.dispatch = run;
-            AtomicBoolean dispatched = new AtomicBoolean(false);
-            request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, dispatched );
-            if (!dispatched.get()) {
+        if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
+            request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()+"?"+request.getQueryString());
+            request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
+            request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
+            request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
+        }
+        final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path);
+        final HttpServletRequest servletRequest = (HttpServletRequest)getRequest();
+        final HttpServletResponse servletResponse = (HttpServletResponse)getResponse();
+        Runnable run = new Runnable() {
+            @Override
+            public void run() {
+                request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCHED, null);
+                DispatcherType type = (DispatcherType)request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
                 try {
-                    doInternalDispatch();
-                }catch (ServletException sx) {
-                    throw new RuntimeException(sx);
-                }catch (IOException ix) {
-                    throw new RuntimeException(ix);
+                    //piggy back on the request dispatcher to ensure that filters etc get called.
+                    //TODO SERVLET3 - async should this be include/forward or a new dispatch type
+                    //javadoc suggests include with the type of DispatcherType.ASYNC
+                    request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, DispatcherType.ASYNC);
+                    requestDispatcher.include(servletRequest, servletResponse);
+                }catch (Exception x) {
+                    //log.error("Async.dispatch",x);
+                    throw new RuntimeException(x);
+                }finally {
+                    request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, type);
                 }
             }
-            if (state.get().equals(AsyncState.DISPATCHED)) {
-                complete();
-            }
-        } else {
-            throw new IllegalStateException("Dispatch not allowed. Invalid state:"+state.get());
-        }
+        };
+        
+        this.dispatch = run;
+        this.request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, null);
     }
 
     @Override
@@ -186,40 +184,8 @@ public class AsyncContextImpl implements
             logDebug("start      ");
         }
 
-        if (state.get() ==  AsyncState.STARTED) {
-            // Execute the runnable using a container thread from the
-            // Connector's thread pool. Use a wrapper to prevent a memory leak
-            Runnable wrapper = new RunnableWrapper(run, context);
-            ClassLoader oldCL;
-            if (Globals.IS_SECURITY_ENABLED) {
-                PrivilegedAction<ClassLoader> pa = new PrivilegedGetTccl();
-                oldCL = AccessController.doPrivileged(pa);
-            } else {
-                oldCL = Thread.currentThread().getContextClassLoader();
-            }
-            try {
-                if (Globals.IS_SECURITY_ENABLED) {
-                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
-                            this.getClass().getClassLoader());
-                    AccessController.doPrivileged(pa);
-                } else {
-                    Thread.currentThread().setContextClassLoader(
-                            this.getClass().getClassLoader());
-                }
-                request.getConnector().getProtocolHandler().getExecutor(
-                        ).execute(wrapper);
-            } finally {
-                if (Globals.IS_SECURITY_ENABLED) {
-                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
-                            oldCL);
-                    AccessController.doPrivileged(pa);
-                } else {
-                    Thread.currentThread().setContextClassLoader(oldCL);
-                }
-            }
-        } else {
-            throw new IllegalStateException("Start not allowed. Invalid state:"+state.get());
-        }
+        Runnable wrapper = new RunnableWrapper(run, context);
+        this.request.getCoyoteRequest().action(ActionCode.ASYNC_RUN, wrapper);
     }
     
     @Override
@@ -259,31 +225,43 @@ public class AsyncContextImpl implements
         }
         servletRequest = null;
         servletResponse = null;
-        listeners.clear();
         hasOriginalRequestAndResponse = true;
-        state.set(AsyncState.NOT_STARTED);
         context = null;
         timeout = -1;
         event = null;
     }
 
     public boolean isStarted() {
-        return (state.get() == AsyncState.STARTED ||
-                state.get() == AsyncState.DISPATCHING);
+        AtomicBoolean result = new AtomicBoolean(false);
+        request.getCoyoteRequest().action(
+                ActionCode.ASYNC_IS_STARTED, result);
+        return result.get();
     }
 
     public void setStarted(Context context, ServletRequest request,
-            ServletResponse response, boolean hasOriginalRequestAndResponse) {
-        if (state.compareAndSet(AsyncState.NOT_STARTED, AsyncState.STARTED) ||
-                state.compareAndSet(AsyncState.DISPATCHED, AsyncState.STARTED)) {
-            this.context = context;
-            this.servletRequest = request;
-            this.servletResponse = response;
-            this.hasOriginalRequestAndResponse = hasOriginalRequestAndResponse;
-            this.event = new AsyncEvent(this, request, response); 
-        } else {
-            throw new IllegalStateException("Start illegal. Invalid state: "+state.get());
+            ServletResponse response, boolean originalRequestResponse) {
+        
+        this.request.getCoyoteRequest().action(
+                ActionCode.ASYNC_START, this);
+
+        this.context = context;
+        this.servletRequest = request;
+        this.servletResponse = response;
+        this.hasOriginalRequestAndResponse = originalRequestResponse;
+        this.event = new AsyncEvent(this, request, response);
+        
+        List<AsyncListenerWrapper> listenersCopy =
+            new ArrayList<AsyncListenerWrapper>();
+        listenersCopy.addAll(listeners);
+        for (AsyncListenerWrapper listener : listenersCopy) {
+            try {
+                listener.fireOnStartAsync(event);
+            } catch (IOException ioe) {
+                log.warn("onStartAsync() failed for listener of type [" +
+                        listener.getClass().getName() + "]", ioe);
+            }
         }
+        listeners.clear();
     }
 
     @Override
@@ -295,122 +273,53 @@ public class AsyncContextImpl implements
         if (log.isDebugEnabled()) {
             logDebug("intDispatch");
         }
-        if (this.state.compareAndSet(AsyncState.TIMING_OUT,
-                AsyncState.TIMING_OUT_NEED_COMPLETE)) {
-            log.debug("TIMING OUT!");
-            boolean listenerInvoked = false;
-            List<AsyncListenerWrapper> listenersCopy =
-                new ArrayList<AsyncListenerWrapper>();
-            listenersCopy.addAll(listeners);
-            for (AsyncListenerWrapper listener : listenersCopy) {
-                listener.fireOnTimeout(event);
-                listenerInvoked = true;
-            }
-            if (listenerInvoked) {
-                // Listener should have called complete
-                if (state.get() != AsyncState.NOT_STARTED) {
-                    ((HttpServletResponse)servletResponse).setStatus(500);
-                    state.set(AsyncState.COMPLETING);
-                    doInternalComplete(true);
-                }
-            } else {
-                // No listeners, container calls complete
-                state.set(AsyncState.COMPLETING);
-                doInternalComplete(false);
-            }
-        } else if (this.state.compareAndSet(AsyncState.ERROR_DISPATCHING, AsyncState.COMPLETING)) {
-            log.debug("ON ERROR!");
-            boolean listenerInvoked = false;
-            for (AsyncListenerWrapper listener : listeners) {
-                try {
-                    listener.fireOnError(event);
-                }catch (IllegalStateException x) {
-                    log.debug("Listener invoked invalid state.",x);
-                }catch (Exception x) {
-                    log.debug("Exception during onError.",x);
-                }
-                listenerInvoked = true;
-            }
-            if (!listenerInvoked) {
-                ((HttpServletResponse)servletResponse).setStatus(500);
-            }
-            doInternalComplete(true);
-        
-        } else if (this.state.compareAndSet(AsyncState.DISPATCHING, AsyncState.DISPATCHED)) {
-            if (this.dispatch!=null) {
-                try {
-                    dispatch.run();
-                } catch (RuntimeException x) {
-                    doInternalComplete(true);
-                    if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause();
-                    if (x.getCause() instanceof IOException) throw (IOException)x.getCause();
-                    throw new ServletException(x);
-                } finally {
-                    dispatch = null;
-                }
-            }
-        } else if (this.state.get()==AsyncState.COMPLETING) {
-            doInternalComplete(false);
-        } else {
-            throw new IllegalStateException("Dispatch illegal. Invalid state: "+state.get());
-        }
-    }
-    
-    private void doInternalComplete(boolean error) {
-        if (log.isDebugEnabled()) {
-            logDebug("intComplete");
-        }
-        if (state.get()==AsyncState.NOT_STARTED) return;
-        if (state.compareAndSet(AsyncState.STARTED, AsyncState.NOT_STARTED)) {
-            //this is the same as
-            //request.startAsync().complete();
-            recycle();
-        } else if (state.compareAndSet(AsyncState.COMPLETING, AsyncState.NOT_STARTED)) {
-            for (AsyncListenerWrapper wrapper : listeners) {
-                try {
-                    wrapper.fireOnComplete(event);
-                }catch (IOException x) {
-                    //how does this propagate, or should it?
-                    //TODO SERVLET3 - async 
-                    log.error("",x);
-                }
+        try {
+            dispatch.run();
+        } catch (RuntimeException x) {
+            // doInternalComplete(true);
+            if (x.getCause() instanceof ServletException) {
+                throw (ServletException)x.getCause();
             }
-            try {
-                if (!error) getResponse().flushBuffer();
-            }catch (Exception x) {
-                log.error("",x);
+            if (x.getCause() instanceof IOException) {
+                throw (IOException)x.getCause();
             }
-            recycle();
-            
-        } else { 
-            throw new IllegalStateException("Complete illegal. Invalid state:"+state.get());
+            throw new ServletException(x);
         }
     }
-    
-    public AsyncState getState() {
-        return state.get();
-    }
+
     
     @Override
     public long getTimeout() {
         return timeout;
     }
-    
+
+
     @Override
     public void setTimeout(long timeout) {
         this.timeout = timeout;
         request.getCoyoteRequest().action(ActionCode.ASYNC_SETTIMEOUT,
                 Long.valueOf(timeout));
     }
-    
-    public void setTimeoutState() {
-        state.set(AsyncState.TIMING_OUT);
-    }
-    
+
+
     public void setErrorState(Throwable t) {
         if (t!=null) request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
-        state.set(AsyncState.ERROR_DISPATCHING);
+        request.getCoyoteRequest().action(ActionCode.ASYNC_ERROR, null);
+        AsyncEvent errorEvent = new AsyncEvent(event.getAsyncContext(),
+                event.getSuppliedRequest(), event.getSuppliedResponse(), t);
+        List<AsyncListenerWrapper> listenersCopy =
+            new ArrayList<AsyncListenerWrapper>();
+        listenersCopy.addAll(listeners);
+        for (AsyncListenerWrapper listener : listenersCopy) {
+            try {
+                listener.fireOnError(errorEvent);
+            } catch (IOException ioe) {
+                log.warn("onStartAsync() failed for listener of type [" +
+                        listener.getClass().getName() + "]", ioe);
+            }
+        }
     }
+
     
     private void logDebug(String method) {
         String rHashCode;
@@ -457,7 +366,7 @@ public class AsyncContextImpl implements
                 "Req: %1$8s  CReq: %2$8s  RP: %3$8s  Stage: %4$s  " +
                 "Thread: %5$20s  State: %6$20s  Method: %7$11s  URI: %8$s",
                 rHashCode, crHashCode, rpHashCode, stage,
-                Thread.currentThread().getName(), state, method, uri);
+                Thread.currentThread().getName(), "N/A", method, uri);
         if (log.isTraceEnabled()) {
             log.trace(msg, new DebugException());
         } else {

Modified: tomcat/trunk/java/org/apache/catalina/security/SecurityClassLoad.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/security/SecurityClassLoad.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/security/SecurityClassLoad.java (original)
+++ tomcat/trunk/java/org/apache/catalina/security/SecurityClassLoad.java Mon Sep 27 12:13:32 2010
@@ -38,6 +38,7 @@ public final class SecurityClassLoad {
         }
         
         loadCorePackage(loader);
+        loadCoyotePackage(loader);
         loadLoaderPackage(loader);
         loadSessionPackage(loader);
         loadUtilPackage(loader);
@@ -64,9 +65,6 @@ public final class SecurityClassLoad {
             "AsyncContextImpl");
         loader.loadClass
             (basePackage +
-            "AsyncContextImpl$AsyncState");
-        loader.loadClass
-            (basePackage +
             "AsyncContextImpl$DebugException");
         loader.loadClass
             (basePackage +
@@ -129,6 +127,13 @@ public final class SecurityClassLoad {
     }
     
     
+    private final static void loadCoyotePackage(ClassLoader loader)
+            throws Exception {
+        String basePackage = "org.apache.coyote.";
+        loader.loadClass(basePackage + "http11.AbstractOutputBuffer$1");
+    }
+
+
     private final static void loadJavaxPackage(ClassLoader loader)
         throws Exception {
         loader.loadClass("javax.servlet.http.Cookie");
@@ -221,13 +226,16 @@ public final class SecurityClassLoad {
     private final static void loadTomcatPackage(ClassLoader loader)
         throws Exception {
         String basePackage = "org.apache.tomcat.";
-        loader.loadClass(basePackage + "util.net.SSLSupport$CipherData");
-        loader.loadClass
-            (basePackage + "util.net.JIoEndpoint$PrivilegedSetTccl");
         // Make sure system property is read at this point
         Class<?> clazz = loader.loadClass(
                 basePackage + "util.http.FastHttpDateFormat");
         clazz.newInstance();
+        loader.loadClass(basePackage + "util.http.HttpMessages");
+        loader.loadClass(basePackage + "util.net.SSLSupport$CipherData");
+        loader.loadClass
+            (basePackage + "util.net.JIoEndpoint$PrivilegedSetTccl");
+        loader.loadClass
+            (basePackage + "util.net.AprEndpoint$PrivilegedSetTccl");
     }
 }
 

Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Mon Sep 27 12:13:32 2010
@@ -133,18 +133,61 @@ public enum ActionCode {
 
     /**
      * Callback for an async call to
+     * {@link javax.servlet.AsyncContext#dispatch()}
+     */
+    ASYNC_DISPATCH,
+
+    /**
+     * Callback to indicate the the actual dispatch has started and that the
+     * async state needs change.
+     */
+    ASYNC_DISPATCHED,
+
+    /**
+     * Callback for an async call to
+     * {@link javax.servlet.AsyncContext#start()}
+     */
+    ASYNC_RUN,
+
+    /**
+     * Callback for an async call to
      * {@link javax.servlet.AsyncContext#complete()}
      */
     ASYNC_COMPLETE,
+    
+    /**
+     * Callback to trigger the processing of an async timeout
+     */
+    ASYNC_TIMEOUT,
+    
+    /**
+     * Callback to trigger the error processing
+     */
+    ASYNC_ERROR,
+    
     /**
      * Callback for an async call to
      * {@link javax.servlet.AsyncContext#setTimeout(long)}
      */
     ASYNC_SETTIMEOUT,
+    
+    /**
+     * Callback to determine if async processing is in progress 
+     */
+    ASYNC_IS_ASYNC,
+    
+    /**
+     * Callback to determine if async dispatch is in progress
+     */
+    ASYNC_IS_STARTED,
 
     /**
-     * Callback for an async call to
-     * {@link javax.servlet.AsyncContext#dispatch()}
+     * Callback to determine if async dispatch is in progress
      */
-    ASYNC_DISPATCH,
+    ASYNC_IS_DISPATCHING,
+
+    /**
+     * Callback to determine if async is timing out
+     */
+    ASYNC_IS_TIMINGOUT
 }

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java Mon Sep 27 12:13:32 2010
@@ -47,6 +47,7 @@ import org.apache.tomcat.util.net.Abstra
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -188,7 +189,7 @@ public class AjpAprProcessor implements 
     /**
      * Socket associated with the current connection.
      */
-    protected long socket;
+    protected SocketWrapper<Long> socket;
 
 
     /**
@@ -355,15 +356,16 @@ public class AjpAprProcessor implements 
      *
      * @throws IOException error during an I/O operation
      */
-    public boolean process(long socket)
+    public boolean process(SocketWrapper<Long> socket)
         throws IOException {
         RequestInfo rp = request.getRequestProcessor();
         rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
 
         // Setting up the socket
         this.socket = socket;
-        Socket.setrbb(this.socket, inputBuffer);
-        Socket.setsbb(this.socket, outputBuffer);
+        long socketRef = socket.getSocket().longValue();
+        Socket.setrbb(socketRef, inputBuffer);
+        Socket.setsbb(socketRef, outputBuffer);
 
         // Error flag
         error = false;
@@ -388,7 +390,7 @@ public class AjpAprProcessor implements 
                 // not regular request processing
                 int type = requestHeaderMessage.getByte();
                 if (type == Constants.JK_AJP13_CPING_REQUEST) {
-                    if (Socket.sendb(socket, pongMessageBuffer, 0,
+                    if (Socket.sendb(socketRef, pongMessageBuffer, 0,
                             pongMessageBuffer.position()) < 0) {
                         error = true;
                     }
@@ -469,7 +471,7 @@ public class AjpAprProcessor implements 
 
         // Add the socket to the poller
         if (!error && !endpoint.isPaused()) {
-            endpoint.getPoller().add(socket);
+            endpoint.getPoller().add(socketRef);
         } else {
             openSocket = false;
         }
@@ -483,7 +485,8 @@ public class AjpAprProcessor implements 
     }
 
     /* Copied from the AjpProcessor.java */
-    public SocketState asyncDispatch(long socket, SocketStatus status) throws IOException {
+    public SocketState asyncDispatch(SocketWrapper<Long> socket,
+            SocketStatus status) throws IOException {
 
         // Setting up the socket
         this.socket = socket;
@@ -535,6 +538,8 @@ public class AjpAprProcessor implements 
      */
     public void action(ActionCode actionCode, Object param) {
 
+        long socketRef = socket.getSocket().longValue();
+        
         if (actionCode == ActionCode.COMMIT) {
 
             if (response.isCommitted())
@@ -564,7 +569,7 @@ public class AjpAprProcessor implements 
             try {
                 flush();
                 // Send explicit flush message
-                if (Socket.sendb(socket, flushMessageBuffer, 0,
+                if (Socket.sendb(socketRef, flushMessageBuffer, 0,
                                  flushMessageBuffer.position()) < 0) {
                     error = true;                    
                 }
@@ -661,14 +666,14 @@ public class AjpAprProcessor implements 
             }        
         } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
             if (param==null) return;
-            if (socket==0) return;
+            if (socketRef==0) return;
             long timeout = ((Long)param).longValue();
-            Socket.timeoutSet(socket, timeout * 1000); 
+            Socket.timeoutSet(socketRef, timeout * 1000); 
         } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
            RequestInfo rp = request.getRequestProcessor();
             AtomicBoolean dispatch = (AtomicBoolean)param;
             if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
-                endpoint.getPoller().add(this.socket);
+                endpoint.getPoller().add(socketRef);
                 dispatch.set(true);
             } else {
                 dispatch.set(true);
@@ -1127,7 +1132,7 @@ public class AjpAprProcessor implements 
         int nRead;
         while (inputBuffer.remaining() < n) {
             nRead = Socket.recvbb
-                (socket, inputBuffer.limit(),
+                (socket.getSocket().longValue(), inputBuffer.limit(),
                         inputBuffer.capacity() - inputBuffer.limit());
             if (nRead > 0) {
                 inputBuffer.limit(inputBuffer.limit() + nRead);
@@ -1160,7 +1165,7 @@ public class AjpAprProcessor implements 
         int nRead;
         while (inputBuffer.remaining() < n) {
             nRead = Socket.recvbb
-                (socket, inputBuffer.limit(),
+                (socket.getSocket().longValue(), inputBuffer.limit(),
                     inputBuffer.capacity() - inputBuffer.limit());
             if (nRead > 0) {
                 inputBuffer.limit(inputBuffer.limit() + nRead);
@@ -1224,7 +1229,7 @@ public class AjpAprProcessor implements 
         }
 
         // Request more data immediately
-        Socket.sendb(socket, getBodyMessageBuffer, 0,
+        Socket.sendb(socket.getSocket().longValue(), getBodyMessageBuffer, 0,
                 getBodyMessageBuffer.position());
 
         boolean moreData = receive();
@@ -1305,7 +1310,7 @@ public class AjpAprProcessor implements 
     protected void flush()
         throws IOException {
         if (outputBuffer.position() > 0) {
-            if (Socket.sendbb(socket, 0, outputBuffer.position()) < 0) {
+            if (Socket.sendbb(socket.getSocket().longValue(), 0, outputBuffer.position()) < 0) {
                 throw new IOException(sm.getString("ajpprocessor.failedsend"));
             }
             outputBuffer.clear();

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java Mon Sep 27 12:13:32 2010
@@ -42,6 +42,7 @@ import org.apache.tomcat.util.modeler.Re
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.AprEndpoint.Handler;
 import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -337,8 +338,8 @@ public class AjpAprProtocol 
         protected AtomicLong registerCount = new AtomicLong(0);
         protected RequestGroupInfo global = new RequestGroupInfo();
 
-        protected ConcurrentHashMap<Long, AjpAprProcessor> connections =
-            new ConcurrentHashMap<Long, AjpAprProcessor>();
+        protected ConcurrentHashMap<SocketWrapper<Long>, AjpAprProcessor> connections =
+            new ConcurrentHashMap<SocketWrapper<Long>, AjpAprProcessor>();
 
         protected ConcurrentLinkedQueue<AjpAprProcessor> recycledProcessors = 
             new ConcurrentLinkedQueue<AjpAprProcessor>() {
@@ -384,11 +385,11 @@ public class AjpAprProtocol 
         }
 
         // FIXME: Support for this could be added in AJP as well
-        public SocketState event(long socket, SocketStatus status) {
+        public SocketState event(SocketWrapper<Long> socket, SocketStatus status) {
             return SocketState.CLOSED;
         }
         
-        public SocketState process(long socket) {
+        public SocketState process(SocketWrapper<Long> socket) {
             AjpAprProcessor processor = recycledProcessors.poll();
             try {
 
@@ -397,7 +398,7 @@ public class AjpAprProtocol 
                 }
 
                 if (processor.process(socket)) {
-                    connections.put(Long.valueOf(socket), processor);
+                    connections.put(socket, processor);
                     return SocketState.OPEN;
                 } else {
                     // recycledProcessors.offer(processor);
@@ -431,9 +432,9 @@ public class AjpAprProtocol 
         }
 
         // FIXME: Support for this could be added in AJP as well
-        public SocketState asyncDispatch(long socket, SocketStatus status) {
+        public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) {
 
-            AjpAprProcessor result = connections.get(Long.valueOf(socket));
+            AjpAprProcessor result = connections.get(socket);
             
             SocketState state = SocketState.CLOSED; 
             if (result != null) {
@@ -462,10 +463,10 @@ public class AjpAprProtocol 
                         (sm.getString("ajpprotocol.proto.error"), e);
                 } finally {
                     if (state != SocketState.LONG) {
-                        connections.remove(Long.valueOf(socket));
+                        connections.remove(socket);
                         recycledProcessors.offer(result);
                         if (state == SocketState.OPEN) {
-                            proto.endpoint.getPoller().add(socket);
+                            proto.endpoint.getPoller().add(socket.getSocket().longValue());
                         }
                     }
                 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Mon Sep 27 12:13:32 2010
@@ -17,11 +17,15 @@
 package org.apache.coyote.http11;
 
 import java.io.IOException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.StringTokenizer;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
+import org.apache.catalina.core.AsyncContextImpl;
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.Adapter;
 import org.apache.coyote.Request;
@@ -43,6 +47,7 @@ import org.apache.tomcat.util.buf.Messag
 import org.apache.tomcat.util.http.FastHttpDateFormat;
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.res.StringManager;
 
 public abstract class AbstractHttp11Processor {
@@ -236,12 +241,6 @@ public abstract class AbstractHttp11Proc
 
     
     /**
-     * Async used
-     */
-    protected boolean async = false;
-
-
-    /**
      * Set compression level.
      */
     public void setCompression(String compression) {
@@ -904,7 +903,24 @@ public abstract class AbstractHttp11Proc
                 request.getInputBuffer();
             internalBuffer.addActiveFilter(savedBody);
         } else if (actionCode == ActionCode.ASYNC_START) {
-            async = true;
+            asyncStart((AsyncContextImpl) param);
+        } else if (actionCode == ActionCode.ASYNC_DISPATCHED) {
+            asyncDispatched();
+        } else if (actionCode == ActionCode.ASYNC_TIMEOUT) {
+            AtomicBoolean result = (AtomicBoolean) param;
+            result.set(asyncTimeout());
+        } else if (actionCode == ActionCode.ASYNC_RUN) {
+            asyncRun((Runnable) param);
+        } else if (actionCode == ActionCode.ASYNC_ERROR) {
+            asyncError();
+        } else if (actionCode == ActionCode.ASYNC_IS_STARTED) {
+            ((AtomicBoolean) param).set(isAsyncStarted());
+        } else if (actionCode == ActionCode.ASYNC_IS_DISPATCHING) {
+            ((AtomicBoolean) param).set(isAsyncDispatching());
+        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
+            ((AtomicBoolean) param).set(isAsync());
+        } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
+            ((AtomicBoolean) param).set(isAsyncTimingOut());
         } else {
             actionInternal(actionCode, param);
         }
@@ -1086,10 +1102,276 @@ public abstract class AbstractHttp11Proc
     public final void recycle() {
         getInputBuffer().recycle();
         getOutputBuffer().recycle();
+        asyncCtxt = null;
         recycleInternal();
     }
     
     protected abstract void recycleInternal();
     
     protected abstract Executor getExecutor();
+    
+    // -------------------------------------------------- Async state management
+    
+    /*
+     * DISPATCHED    - Standard request. Not in Async mode.
+     * STARTING      - ServletRequest.startAsync() has been called but the
+     *                 request in which that call was made has not finished
+     *                 processing.
+     * STARTED       - ServletRequest.startAsync() has been called and the
+     *                 request in which that call was made has finished
+     *                 processing.
+     * MUST_COMPLETE - complete() has been called before the request in which
+     *                 ServletRequest.startAsync() has finished. As soon as that
+     *                 request finishes, the complete() will be processed.
+     * COMPLETING    - The call to complete() was made once the request was in
+     *                 the STARTED state. May or may not be triggered by a
+     *                 container thread - depends if start(Runnable) was used
+     * 
+     * TODO - markt - Move this to a separate class
+     */
+    private static enum AsyncState {
+        DISPATCHED(false, false, false),
+        STARTING(true, true, false),
+        STARTED(true, true, false),
+        MUST_COMPLETE(true, false, false),
+        COMPLETING(true, false, false),
+        TIMING_OUT(true, false, false),
+        MUST_DISPATCH(true, false, true),
+        DISPATCHING(true, false, true),
+        ERROR(true,false,false);
+    
+        private boolean isAsync;
+        private boolean isStarted;
+        private boolean isDispatching;
+        
+        private AsyncState(boolean isAsync, boolean isStarted,
+                boolean isDispatching) {
+            this.isAsync = isAsync;
+            this.isStarted = isStarted;
+            this.isDispatching = isDispatching;
+        }
+        
+        public boolean isAsync() {
+            return this.isAsync;
+        }
+        
+        public boolean isStarted() {
+            return this.isStarted;
+        }
+        
+        public boolean isDispatching() {
+            return this.isDispatching;
+        }
+    }
+    
+    private volatile AsyncState state = AsyncState.DISPATCHED;
+    // Need this to fire listener on complete
+    private AsyncContextImpl asyncCtxt = null;
+    
+    protected boolean isAsync() {
+        return state.isAsync();
+    }
+
+    protected boolean isAsyncDispatching() {
+        return state.isDispatching();
+    }
+
+    protected boolean isAsyncStarted() {
+        return state.isStarted();
+    }
+
+    protected boolean isAsyncTimingOut() {
+        return state == AsyncState.TIMING_OUT;
+    }
+
+
+    private synchronized void asyncStart(AsyncContextImpl asyncCtxt) {
+        if (state == AsyncState.DISPATCHED) {
+            state = AsyncState.STARTING;
+            this.asyncCtxt = asyncCtxt;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "startAsync()", state));
+        }
+    }
+    
+    /*
+     * Async has been processed. Whether or not to enter a long poll depends on
+     * current state. For example, as per SRV.2.3.3.3 can now process calls to
+     * complete() or dispatch().
+     */
+    protected synchronized SocketState asyncPostProcess() {
+        
+        if (state == AsyncState.STARTING) {
+            state = AsyncState.STARTED;
+            return SocketState.LONG;
+        } else if (state == AsyncState.MUST_COMPLETE) {
+            asyncCtxt.fireOnComplete();
+            state = AsyncState.DISPATCHED;
+            return SocketState.ASYNC_END;
+        } else if (state == AsyncState.COMPLETING) {
+            state = AsyncState.DISPATCHED;
+            return SocketState.ASYNC_END;
+        } else if (state == AsyncState.MUST_DISPATCH) {
+            state = AsyncState.DISPATCHING;
+            return SocketState.ASYNC_END;
+        } else if (state == AsyncState.DISPATCHING) {
+            state = AsyncState.DISPATCHED;
+            return SocketState.ASYNC_END;
+        } else if (state == AsyncState.ERROR) {
+            asyncCtxt.fireOnComplete();
+            state = AsyncState.DISPATCHED;
+            return SocketState.ASYNC_END;
+        //} else if (state == AsyncState.DISPATCHED) {
+        //    // No state change
+        //    return SocketState.OPEN;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "asyncLongPoll()", state));
+        }
+    }
+    
+
+    protected synchronized boolean asyncComplete() {
+        boolean doComplete = false;
+        
+        if (state == AsyncState.STARTING) {
+            state = AsyncState.MUST_COMPLETE;
+        } else if (state == AsyncState.STARTED) {
+            state = AsyncState.COMPLETING;
+            doComplete = true;
+        } else if (state == AsyncState.TIMING_OUT ||
+                state == AsyncState.ERROR) {
+            state = AsyncState.MUST_COMPLETE;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "asyncComplete()", state));
+            
+        }
+        return doComplete;
+    }
+    
+    
+    private synchronized boolean asyncTimeout() {
+        if (state == AsyncState.STARTED) {
+            state = AsyncState.TIMING_OUT;
+            return true;
+        } else if (state == AsyncState.COMPLETING ||
+                state == AsyncState.DISPATCHED) {
+            // NOOP - App called complete between the the timeout firing and
+            // execution reaching this point
+            return false;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "timeoutAsync()", state));
+        }
+    }
+    
+    
+    protected synchronized boolean asyncDispatch() {
+        boolean doDispatch = false;
+        if (state == AsyncState.STARTING) {
+            state = AsyncState.MUST_DISPATCH;
+        } else if (state == AsyncState.STARTED) {
+            state = AsyncState.DISPATCHING;
+            doDispatch = true;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "dispatchAsync()", state));
+        }
+        return doDispatch;
+    }
+    
+    
+    private synchronized void asyncDispatched() {
+        if (state == AsyncState.DISPATCHING) {
+            state = AsyncState.DISPATCHED;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "dispatchAsync()", state));
+        }
+    }
+    
+    
+    private synchronized boolean asyncError() {
+        boolean doDispatch = false;
+        if (state == AsyncState.DISPATCHED ||
+                state == AsyncState.TIMING_OUT) {
+            state = AsyncState.ERROR;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "dispatchAsync()", state));
+        }
+        return doDispatch;
+    }
+    
+    private synchronized void asyncRun(Runnable runnable) {
+        if (state == AsyncState.STARTING || state ==  AsyncState.STARTED) {
+            // Execute the runnable using a container thread from the
+            // Connector's thread pool. Use a wrapper to prevent a memory leak
+            ClassLoader oldCL;
+            if (Constants.IS_SECURITY_ENABLED) {
+                PrivilegedAction<ClassLoader> pa = new PrivilegedGetTccl();
+                oldCL = AccessController.doPrivileged(pa);
+            } else {
+                oldCL = Thread.currentThread().getContextClassLoader();
+            }
+            try {
+                if (Constants.IS_SECURITY_ENABLED) {
+                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
+                            this.getClass().getClassLoader());
+                    AccessController.doPrivileged(pa);
+                } else {
+                    Thread.currentThread().setContextClassLoader(
+                            this.getClass().getClassLoader());
+                }
+                
+                getExecutor().execute(runnable);
+            } finally {
+                if (Constants.IS_SECURITY_ENABLED) {
+                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
+                            oldCL);
+                    AccessController.doPrivileged(pa);
+                } else {
+                    Thread.currentThread().setContextClassLoader(oldCL);
+                }
+            }
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "runAsync()", state));
+        }
+
+    }
+    
+    private static class PrivilegedSetTccl implements PrivilegedAction<Void> {
+
+        private ClassLoader cl;
+
+        PrivilegedSetTccl(ClassLoader cl) {
+            this.cl = cl;
+        }
+
+        @Override
+        public Void run() {
+            Thread.currentThread().setContextClassLoader(cl);
+            return null;
+        }
+    }
+
+    private static class PrivilegedGetTccl
+            implements PrivilegedAction<ClassLoader> {
+
+        @Override
+        public ClassLoader run() {
+            return Thread.currentThread().getContextClassLoader();
+        }
+    }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/Constants.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Constants.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Constants.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Constants.java Mon Sep 27 12:13:32 2010
@@ -211,5 +211,9 @@ public final class Constants {
      */
     public static final String POST = "POST";
 
-
+    /**
+     * Has security been turned on?
+     */
+    public static final boolean IS_SECURITY_ENABLED =
+        (System.getSecurityManager() != null);
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11AprProcessor.java Mon Sep 27 12:13:32 2010
@@ -24,7 +24,6 @@ import java.security.cert.CertificateFac
 import java.security.cert.X509Certificate;
 import java.util.Locale;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.ActionHook;
@@ -47,6 +46,7 @@ import org.apache.tomcat.util.net.Abstra
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
 
 
 /**
@@ -124,7 +124,7 @@ public class Http11AprProcessor extends 
     /**
      * Socket associated with the current connection.
      */
-    protected long socket = 0;
+    protected SocketWrapper<Long> socket = null;
 
 
     /**
@@ -186,7 +186,7 @@ public class Http11AprProcessor extends 
      *
      * @throws IOException error during an I/O operation
      */
-    public SocketState process(long socket)
+    public SocketState process(SocketWrapper<Long> socket)
         throws IOException {
         RequestInfo rp = request.getRequestProcessor();
         rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
@@ -201,13 +201,13 @@ public class Http11AprProcessor extends 
 
         // Setting up the socket
         this.socket = socket;
-        inputBuffer.setSocket(socket);
-        outputBuffer.setSocket(socket);
+        long socketRef = socket.getSocket().longValue();
+        inputBuffer.setSocket(socketRef);
+        outputBuffer.setSocket(socketRef);
 
         // Error flag
         error = false;
         comet = false;
-        async = false;
         keepAlive = true;
 
         int keepAliveLeft = maxKeepAliveRequests;
@@ -216,12 +216,12 @@ public class Http11AprProcessor extends 
         boolean keptAlive = false;
         boolean openSocket = false;
 
-        while (!error && keepAlive && !comet && !async && !endpoint.isPaused()) {
+        while (!error && keepAlive && !comet && !isAsync() && !endpoint.isPaused()) {
 
             // Parsing the request header
             try {
                 if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
-                    Socket.timeoutSet(socket, soTimeout * 1000);
+                    Socket.timeoutSet(socketRef, soTimeout * 1000);
                 }
                 if (!inputBuffer.parseRequestLine(keptAlive)) {
                     // This means that no data is available right now
@@ -229,13 +229,13 @@ public class Http11AprProcessor extends 
                     // and the method should return true
                     openSocket = true;
                     // Add the socket to the poller
-                    endpoint.getPoller().add(socket);
+                    endpoint.getPoller().add(socketRef);
                     break;
                 }
                 request.setStartTime(System.currentTimeMillis());
                 keptAlive = true;
                 if (!disableUploadTimeout) {
-                    Socket.timeoutSet(socket, timeout * 1000);
+                    Socket.timeoutSet(socketRef, timeout * 1000);
                 }
                 inputBuffer.parseHeaders();
             } catch (IOException e) {
@@ -296,7 +296,7 @@ public class Http11AprProcessor extends 
             }
 
             // Finish the handling of the request
-            if (!comet && !async) {
+            if (!comet && !isAsync()) {
                 // If we know we are closing the connection, don't drain input.
                 // This way uploading a 100GB file doesn't tie up the thread 
                 // if the servlet has rejected it.
@@ -312,7 +312,7 @@ public class Http11AprProcessor extends 
             }
             request.updateCounters();
 
-            if (!comet && !async) {
+            if (!comet && !isAsync()) {
                 // Next request
                 inputBuffer.nextRequest();
                 outputBuffer.nextRequest();
@@ -320,7 +320,7 @@ public class Http11AprProcessor extends 
             
             // Do sendfile as needed: add socket to sendfile and end
             if (sendfileData != null && !error) {
-                sendfileData.socket = socket;
+                sendfileData.socket = socketRef;
                 sendfileData.keepAlive = keepAlive;
                 if (!endpoint.getSendfile().add(sendfileData)) {
                     openSocket = true;
@@ -335,11 +335,9 @@ public class Http11AprProcessor extends 
         rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
 
         if (error || endpoint.isPaused()) {
-            inputBuffer.nextRequest();
-            outputBuffer.nextRequest();
             recycle();
             return SocketState.CLOSED;
-        } else if (comet  || async) {
+        } else if (comet  || isAsync()) {
             return SocketState.LONG;
         } else {
             recycle();
@@ -349,12 +347,14 @@ public class Http11AprProcessor extends 
     }
 
     /* Copied from the AjpProcessor.java */
-    public SocketState asyncDispatch(long socket, SocketStatus status) {
+    public SocketState asyncDispatch(SocketWrapper<Long> socket,
+            SocketStatus status) {
 
         // Setting up the socket
         this.socket = socket;
-        inputBuffer.setSocket(socket);
-        outputBuffer.setSocket(socket);
+        long socketRef = socket.getSocket().longValue();
+        inputBuffer.setSocket(socketRef);
+        outputBuffer.setSocket(socketRef);
 
         RequestInfo rp = request.getRequestProcessor();
         try {
@@ -372,30 +372,25 @@ public class Http11AprProcessor extends 
 
         rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
 
-        if (async) {
-            if (error) {
-                response.setStatus(500);
-                request.updateCounters();
-                recycle();
+        if (error) {
+            recycle();
+            return SocketState.CLOSED;
+        } else if (isAsync()) {
+            return SocketState.LONG;
+        } else {
+            recycle();
+            if (!keepAlive) {
                 return SocketState.CLOSED;
             } else {
-                return SocketState.LONG;
+                return SocketState.OPEN;
             }
-        } else {
-            if (error) {
-                response.setStatus(500);
-            }
-            request.updateCounters();
-            recycle();
-            return SocketState.CLOSED;
         }
-        
     }
 
 
     @Override
     public void recycleInternal() {
-        this.socket = 0;
+        this.socket = null;
     }
     
 
@@ -411,6 +406,8 @@ public class Http11AprProcessor extends 
     @Override
     public void actionInternal(ActionCode actionCode, Object param) {
 
+        long socketRef = socket.getSocket().longValue();
+        
         if (actionCode == ActionCode.CLOSE) {
             // Close
 
@@ -418,7 +415,6 @@ public class Http11AprProcessor extends 
             // transactions with the client
 
             comet = false;
-            async = false;
             try {
                 outputBuffer.endRequest();
             } catch (IOException e) {
@@ -429,9 +425,9 @@ public class Http11AprProcessor extends 
         } else if (actionCode == ActionCode.REQ_HOST_ADDR_ATTRIBUTE) {
 
             // Get remote host address
-            if (remoteAddr == null && (socket != 0)) {
+            if (remoteAddr == null && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_REMOTE, socket);
+                    long sa = Address.get(Socket.APR_REMOTE, socketRef);
                     remoteAddr = Address.getip(sa);
                 } catch (Exception e) {
                     log.warn(sm.getString("http11processor.socket.info"), e);
@@ -442,9 +438,9 @@ public class Http11AprProcessor extends 
         } else if (actionCode == ActionCode.REQ_LOCAL_NAME_ATTRIBUTE) {
 
             // Get local host name
-            if (localName == null && (socket != 0)) {
+            if (localName == null && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_LOCAL, socket);
+                    long sa = Address.get(Socket.APR_LOCAL, socketRef);
                     localName = Address.getnameinfo(sa, 0);
                 } catch (Exception e) {
                     log.warn(sm.getString("http11processor.socket.info"), e);
@@ -455,9 +451,9 @@ public class Http11AprProcessor extends 
         } else if (actionCode == ActionCode.REQ_HOST_ATTRIBUTE) {
 
             // Get remote host name
-            if (remoteHost == null && (socket != 0)) {
+            if (remoteHost == null && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_REMOTE, socket);
+                    long sa = Address.get(Socket.APR_REMOTE, socketRef);
                     remoteHost = Address.getnameinfo(sa, 0);
                 } catch (Exception e) {
                     log.warn(sm.getString("http11processor.socket.info"), e);
@@ -468,9 +464,9 @@ public class Http11AprProcessor extends 
         } else if (actionCode == ActionCode.REQ_LOCAL_ADDR_ATTRIBUTE) {
 
             // Get local host address
-            if (localAddr == null && (socket != 0)) {
+            if (localAddr == null && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_LOCAL, socket);
+                    long sa = Address.get(Socket.APR_LOCAL, socketRef);
                     localAddr = Address.getip(sa);
                 } catch (Exception e) {
                     log.warn(sm.getString("http11processor.socket.info"), e);
@@ -482,9 +478,9 @@ public class Http11AprProcessor extends 
         } else if (actionCode == ActionCode.REQ_REMOTEPORT_ATTRIBUTE) {
 
             // Get remote port
-            if (remotePort == -1 && (socket != 0)) {
+            if (remotePort == -1 && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_REMOTE, socket);
+                    long sa = Address.get(Socket.APR_REMOTE, socketRef);
                     Sockaddr addr = Address.getInfo(sa);
                     remotePort = addr.port;
                 } catch (Exception e) {
@@ -496,9 +492,9 @@ public class Http11AprProcessor extends 
         } else if (actionCode == ActionCode.REQ_LOCALPORT_ATTRIBUTE) {
 
             // Get local port
-            if (localPort == -1 && (socket != 0)) {
+            if (localPort == -1 && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_LOCAL, socket);
+                    long sa = Address.get(Socket.APR_LOCAL, socketRef);
                     Sockaddr addr = Address.getInfo(sa);
                     localPort = addr.port;
                 } catch (Exception e) {
@@ -509,24 +505,24 @@ public class Http11AprProcessor extends 
 
         } else if (actionCode == ActionCode.REQ_SSL_ATTRIBUTE ) {
 
-            if (ssl && (socket != 0)) {
+            if (ssl && (socketRef != 0)) {
                 try {
                     // Cipher suite
-                    Object sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_CIPHER);
+                    Object sslO = SSLSocket.getInfoS(socketRef, SSL.SSL_INFO_CIPHER);
                     if (sslO != null) {
                         request.setAttribute(AbstractEndpoint.CIPHER_SUITE_KEY, sslO);
                     }
                     // Get client certificate and the certificate chain if present
                     // certLength == -1 indicates an error
-                    int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
-                    byte[] clientCert = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+                    int certLength = SSLSocket.getInfoI(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
+                    byte[] clientCert = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT);
                     X509Certificate[] certs = null;
                     if (clientCert != null  && certLength > -1) {
                         certs = new X509Certificate[certLength + 1];
                         CertificateFactory cf = CertificateFactory.getInstance("X.509");
                         certs[0] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(clientCert));
                         for (int i = 0; i < certLength; i++) {
-                            byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
+                            byte[] data = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
                             certs[i+1] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(data));
                         }
                     }
@@ -534,12 +530,12 @@ public class Http11AprProcessor extends 
                         request.setAttribute(AbstractEndpoint.CERTIFICATE_KEY, certs);
                     }
                     // User key size
-                    sslO = Integer.valueOf(SSLSocket.getInfoI(socket,
+                    sslO = Integer.valueOf(SSLSocket.getInfoI(socketRef,
                             SSL.SSL_INFO_CIPHER_USEKEYSIZE));
                     request.setAttribute(AbstractEndpoint.KEY_SIZE_KEY, sslO);
 
                     // SSL session ID
-                    sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_SESSION_ID);
+                    sslO = SSLSocket.getInfoS(socketRef, SSL.SSL_INFO_SESSION_ID);
                     if (sslO != null) {
                         request.setAttribute(AbstractEndpoint.SESSION_ID_KEY, sslO);
                     }
@@ -552,7 +548,7 @@ public class Http11AprProcessor extends 
 
         } else if (actionCode == ActionCode.REQ_SSL_CERTIFICATE) {
 
-            if (ssl && (socket != 0)) {
+            if (ssl && (socketRef != 0)) {
                 // Consume and buffer the request body, so that it does not
                 // interfere with the client's handshake messages
                 InputFilter[] inputFilters = inputBuffer.getFilters();
@@ -560,22 +556,22 @@ public class Http11AprProcessor extends 
                 inputBuffer.addActiveFilter(inputFilters[Constants.BUFFERED_FILTER]);
                 try {
                     // Configure connection to require a certificate
-                    SSLSocket.setVerify(socket, SSL.SSL_CVERIFY_REQUIRE,
+                    SSLSocket.setVerify(socketRef, SSL.SSL_CVERIFY_REQUIRE,
                             endpoint.getSSLVerifyDepth());
                     // Renegotiate certificates
-                    if (SSLSocket.renegotiate(socket) == 0) {
+                    if (SSLSocket.renegotiate(socketRef) == 0) {
                         // Don't look for certs unless we know renegotiation worked.
                         // Get client certificate and the certificate chain if present
                         // certLength == -1 indicates an error 
-                        int certLength = SSLSocket.getInfoI(socket,SSL.SSL_INFO_CLIENT_CERT_CHAIN);
-                        byte[] clientCert = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+                        int certLength = SSLSocket.getInfoI(socketRef,SSL.SSL_INFO_CLIENT_CERT_CHAIN);
+                        byte[] clientCert = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT);
                         X509Certificate[] certs = null;
                         if (clientCert != null && certLength > -1) {
                             certs = new X509Certificate[certLength + 1];
                             CertificateFactory cf = CertificateFactory.getInstance("X.509");
                             certs[0] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(clientCert));
                             for (int i = 0; i < certLength; i++) {
-                                byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
+                                byte[] data = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
                                 certs[i+1] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(data));
                             }
                         }
@@ -599,29 +595,16 @@ public class Http11AprProcessor extends 
         } else if (actionCode == ActionCode.COMET_SETTIMEOUT) {
             //no op
         } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
-          //TODO SERVLET3 - async - that is bit hacky -
-            AtomicBoolean dispatch = (AtomicBoolean)param;
-            RequestInfo rp = request.getRequestProcessor();
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
-                dispatch.set(true);
-                endpoint.getHandler().asyncDispatch(this.socket, SocketStatus.STOP);
-            } else {
-                dispatch.set(false);
+            if (asyncComplete()) {
+                endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
             }
         } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
-          //TODO SERVLET3 - async
             if (param==null) return;
-            if (socket==0) return;
             long timeout = ((Long)param).longValue();
-            Socket.timeoutSet(socket, timeout * 1000);
+            socket.setTimeout(timeout);
         } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
-            RequestInfo rp = request.getRequestProcessor();
-            AtomicBoolean dispatch = (AtomicBoolean)param;
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
-                endpoint.getPoller().add(this.socket);
-                dispatch.set(true);
-            } else {
-                dispatch.set(true);
+            if (asyncDispatch()) {
+                endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
             }
         }
         

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java?rev=1001698&r1=1001697&r2=1001698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11AprProtocol.java Mon Sep 27 12:13:32 2010
@@ -17,6 +17,8 @@
 
 package org.apache.coyote.http11;
 
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,6 +34,7 @@ import org.apache.tomcat.util.modeler.Re
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.AprEndpoint.Handler;
 import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -246,8 +249,8 @@ public class Http11AprProtocol extends A
         protected AtomicLong registerCount = new AtomicLong(0);
         protected RequestGroupInfo global = new RequestGroupInfo();
         
-        protected ConcurrentHashMap<Long, Http11AprProcessor> connections =
-            new ConcurrentHashMap<Long, Http11AprProcessor>();
+        protected ConcurrentHashMap<SocketWrapper<Long>, Http11AprProcessor> connections =
+            new ConcurrentHashMap<SocketWrapper<Long>, Http11AprProcessor>();
         protected ConcurrentLinkedQueue<Http11AprProcessor> recycledProcessors = 
             new ConcurrentLinkedQueue<Http11AprProcessor>() {
             private static final long serialVersionUID = 1L;
@@ -294,8 +297,8 @@ public class Http11AprProtocol extends A
         }
 
         @Override
-        public SocketState event(long socket, SocketStatus status) {
-            Http11AprProcessor result = connections.get(Long.valueOf(socket));
+        public SocketState event(SocketWrapper<Long> socket, SocketStatus status) {
+            Http11AprProcessor result = connections.get(socket);
             
             SocketState state = SocketState.CLOSED; 
             if (result != null) {
@@ -324,16 +327,16 @@ public class Http11AprProtocol extends A
                                 "http11protocol.proto.error"), e);
                     } finally {
                         if (state != SocketState.LONG) {
-                            connections.remove(Long.valueOf(socket));
+                            connections.remove(socket);
                             recycledProcessors.offer(result);
                             if (state == SocketState.OPEN) {
-                                ((AprEndpoint)proto.endpoint).getPoller().add(socket);
+                                ((AprEndpoint)proto.endpoint).getPoller().add(socket.getSocket().longValue());
                             }
                         } else {
-                            ((AprEndpoint)proto.endpoint).getCometPoller().add(socket);
+                            ((AprEndpoint)proto.endpoint).getCometPoller().add(socket.getSocket().longValue());
                         }
                     }
-                } else if (result.async) {
+                } else if (result.isAsync()) {
                     state = asyncDispatch(socket, status);
                 }
             }
@@ -341,7 +344,7 @@ public class Http11AprProtocol extends A
         }
         
         @Override
-        public SocketState process(long socket) {
+        public SocketState process(SocketWrapper<Long> socket) {
             Http11AprProcessor processor = recycledProcessors.poll();
             try {
                 if (processor == null) {
@@ -350,11 +353,13 @@ public class Http11AprProtocol extends A
 
                 SocketState state = processor.process(socket);
                 if (state == SocketState.LONG) {
-                    // Associate the connection with the processor. The next request 
-                    // processed by this thread will use either a new or a recycled
-                    // processor.
-                    connections.put(Long.valueOf(socket), processor);
-                    ((AprEndpoint)proto.endpoint).getCometPoller().add(socket);
+                    // Check if the post processing is going to change the state
+                    state = processor.asyncPostProcess();
+                }
+                if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
+                    // Need to make socket available for next processing cycle
+                    // but no need for the poller
+                    connections.put(socket, processor);
                 } else {
                     recycledProcessors.offer(processor);
                 }
@@ -386,8 +391,8 @@ public class Http11AprProtocol extends A
         }
 
         @Override
-        public SocketState asyncDispatch(long socket, SocketStatus status) {
-            Http11AprProcessor result = connections.get(Long.valueOf(socket));
+        public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) {
+            Http11AprProcessor result = connections.get(socket);
             
             SocketState state = SocketState.CLOSED; 
             if (result != null) {
@@ -404,11 +409,14 @@ public class Http11AprProtocol extends A
                     Http11AprProtocol.log.error
                         (sm.getString("http11protocol.proto.error"), e);
                 } finally {
-                    if (state != SocketState.LONG) {
-                        connections.remove(Long.valueOf(socket));
+                    if (state == SocketState.LONG && result.isAsync()) {
+                        state = result.asyncPostProcess();
+                    }
+                    if (state != SocketState.LONG && state != SocketState.ASYNC_END) {
+                        connections.remove(socket);
                         recycledProcessors.offer(result);
                         if (state == SocketState.OPEN) {
-                            ((AprEndpoint)proto.endpoint).getPoller().add(socket);
+                            ((AprEndpoint)proto.endpoint).getPoller().add(socket.getSocket().longValue());
                         }
                     }
                 }
@@ -440,15 +448,29 @@ public class Http11AprProtocol extends A
                 synchronized (this) {
                     try {
                         long count = registerCount.incrementAndGet();
-                        RequestInfo rp = processor.getRequest().getRequestProcessor();
+                        final RequestInfo rp = processor.getRequest().getRequestProcessor();
                         rp.setGlobalProcessor(global);
-                        ObjectName rpName = new ObjectName
+                        final ObjectName rpName = new ObjectName
                             (proto.getDomain() + ":type=RequestProcessor,worker="
                                 + proto.getName() + ",name=HttpRequest" + count);
                         if (log.isDebugEnabled()) {
                             log.debug("Register " + rpName);
                         }
-                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        if (Constants.IS_SECURITY_ENABLED) {
+                            AccessController.doPrivileged(new PrivilegedAction<Void>() {
+                                @Override
+                                public Void run() {
+                                    try {
+                                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                                    } catch (Exception e) {
+                                        log.warn("Error registering request");
+                                    }
+                                    return null;
+                                }
+                            });
+                        } else {
+                            Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        }
                         rp.setRpName(rpName);
                     } catch (Exception e) {
                         log.warn("Error registering request");



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message