tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r1514228 - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/coyote/ java/org/apache/coyote/http11/ java/org/apache/tomcat/util/net/ test/org/apache/catalina/nonblocking/
Date Thu, 15 Aug 2013 10:32:15 GMT
Author: markt
Date: Thu Aug 15 10:32:15 2013
New Revision: 1514228

URL: http://svn.apache.org/r1514228
Log:
The container is responsible for the first call to each of onWritePossible() and onDataAvailable()
once a listener has been set.
Main component is the addition to the SocketWrapper of a list of dispatch types that need
to be made. "Dispatch type" in this case meaning "process the socket using the specified SocketStatus".
This is used to register trigger the first call to each of onWritePossible() and onDataAvailable()
for which the container is responsible.
Fix some additional issues identified in the test case.

Added:
    tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java   (with props)
Modified:
    tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java
    tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
    tomcat/trunk/java/org/apache/coyote/ActionCode.java
    tomcat/trunk/java/org/apache/coyote/Response.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
    tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java

Modified: tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java Thu Aug 15 10:32:15 2013
@@ -249,6 +249,18 @@ public class InputBuffer extends Reader
 
     public void setReadListener(ReadListener listener) {
         coyoteRequest.setReadListener(listener);
+
+        // The container is responsible for the first call to
+        // listener.onDataAvailable(). If isReady() returns true, the container
+        // needs to call listener.onDataAvailable() from a new thread. If
+        // isReady() returns false, the socket will be registered for read and
+        // the container will call listener.onDataAvailable() once data arrives.
+        // Must call isFinished() first as a call to isReady() if the request
+        // has been finished will register the socket for read interest and that
+        // is not required.
+        if (isFinished() || isReady()) {
+            coyoteRequest.action(ActionCode.DISPATCH_READ, null);
+        }
     }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Thu Aug 15 10:32:15 2013
@@ -38,6 +38,7 @@ import org.apache.tomcat.util.collection
 import org.apache.tomcat.util.modeler.Registry;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
+import org.apache.tomcat.util.net.DispatchType;
 import org.apache.tomcat.util.net.SocketStatus;
 import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
@@ -616,7 +617,11 @@ public abstract class AbstractProtocol i
 
                 SocketState state = SocketState.CLOSED;
                 do {
-                    if (status == SocketStatus.DISCONNECT &&
+                    if (wrapper.hasNextDispatch()) {
+                        DispatchType nextDispatch = wrapper.getNextDispatch();
+                        state = processor.asyncDispatch(
+                                nextDispatch.getSocketStatus());
+                    } else if (status == SocketStatus.DISCONNECT &&
                             !processor.isComet()) {
                         // Do nothing here, just wait for it to get recycled
                         // Don't do this for Comet we need to generate an end
@@ -663,7 +668,8 @@ public abstract class AbstractProtocol i
                                 "], State out: [" + state + "]");
                     }
                 } while (state == SocketState.ASYNC_END ||
-                        state == SocketState.UPGRADING);
+                        state == SocketState.UPGRADING ||
+                        wrapper.hasNextDispatch());
 
                 if (state == SocketState.LONG) {
                     // In the middle of processing a request/response. Keep the

Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Thu Aug 15 10:32:15 2013
@@ -215,5 +215,17 @@ public enum ActionCode {
     /**
      * Indicates if the request body has been fully read.
      */
-    REQUEST_BODY_FULLY_READ
+    REQUEST_BODY_FULLY_READ,
+
+    /**
+     * Indicates that the container needs to trigger a call to onDataAvailable()
+     * for the registered non-blocking read listener.
+     */
+    DISPATCH_READ,
+
+    /**
+     * Indicates that the container needs to trigger a call to onWritePossible()
+     * for the registered non-blocking write listener.
+     */
+    DISPATCH_WRITE
 }

Modified: tomcat/trunk/java/org/apache/coyote/Response.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Response.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Response.java Thu Aug 15 10:32:15 2013
@@ -593,6 +593,21 @@ public final class Response {
         }
 
         this.listener = listener;
+
+        // The container is responsible for the first call to
+        // listener.onWritePossible(). If isReady() returns true, the container
+        // needs to call listener.onWritePossible() from a new thread. If
+        // isReady() returns false, the socket will be registered for write and
+        // the container will call listener.onWritePossible() once data can be
+        // written.
+        if (isReady()) {
+            action(ActionCode.DISPATCH_WRITE, null);
+            // Need to set the fireListener flag otherwise when the container
+            // tries to trigger onWritePossible, nothing will happen
+            synchronized (nonBlockingStateLock) {
+                fireListener = true;
+            }
+        }
     }
 
     public boolean isReady() {

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Thu Aug 15 10:32:15
2013
@@ -51,6 +51,7 @@ import org.apache.tomcat.util.http.MimeH
 import org.apache.tomcat.util.log.UserDataHelper;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.DispatchType;
 import org.apache.tomcat.util.net.SocketStatus;
 import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
@@ -828,6 +829,10 @@ public abstract class AbstractHttp11Proc
         } else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) {
             AtomicBoolean result = (AtomicBoolean) param;
             result.set(getInputBuffer().isFinished());
+        } else if (actionCode == ActionCode.DISPATCH_READ) {
+            socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ);
+        } else if (actionCode == ActionCode.DISPATCH_WRITE) {
+            socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
         } else {
             actionInternal(actionCode, param);
         }

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java Thu Aug 15 10:32:15
2013
@@ -554,7 +554,10 @@ public class InternalInputBuffer extends
 
     @Override
     protected int nbRead() throws IOException {
-        throw new IllegalStateException("This method is unused for BIO");
+        // If this gets called for BIO need to make caller think there is data
+        // to read as BIO always reads whether there is data or not (and blocks
+        // until there is data to read).
+        return 1;
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Aug 15 10:32:15 2013
@@ -1718,16 +1718,30 @@ public class AprEndpoint extends Abstrac
                                         // application code. By signalling read/write is
possible, a
                                         // read/write will be attempted, fail and that will
trigger
                                         // an exception the application will see.
-                                        if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN
||
-                                                (wrapper.pollerFlags & Poll.APR_POLLIN)
== Poll.APR_POLLIN) {
-                                            // Must be doing a non-blocking read
+                                        // Check the return flags first, followed by what
the socket
+                                        // was registered for
+                                        if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN)
{
+                                            // Error probably occurred during a non-blocking
read
                                             if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ))
{
                                                 // Close socket and clear pool
                                                 destroySocket(desc[n*2+1]);
                                             }
-                                        } else if ((desc[n*2] & Poll.APR_POLLOUT) ==
Poll.APR_POLLOUT ||
-                                                (wrapper.pollerFlags & Poll.APR_POLLOUT)
== Poll.APR_POLLOUT) {
-                                            // Must be doing an non-blocking write write
+                                        } else if ((desc[n*2] & Poll.APR_POLLOUT) ==
Poll.APR_POLLOUT) {
+                                            // Error probably occurred during a non-blocking
write
+                                            if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE))
{
+                                                // Close socket and clear pool
+                                                destroySocket(desc[n*2+1]);
+                                            }
+                                        } else if ((wrapper.pollerFlags & Poll.APR_POLLIN)
== Poll.APR_POLLIN) {
+                                            // Can't tell what was happening when the error
occurred but the
+                                            // socket is registered for non-blocking read
so use that
+                                            if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ))
{
+                                                // Close socket and clear pool
+                                                destroySocket(desc[n*2+1]);
+                                            }
+                                        } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT)
== Poll.APR_POLLOUT) {
+                                            // Can't tell what was happening when the error
occurred but the
+                                            // socket is registered for non-blocking write
so use that
                                             if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE))
{
                                                 // Close socket and clear pool
                                                 destroySocket(desc[n*2+1]);

Added: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java?rev=1514228&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java Thu Aug 15 10:32:15 2013
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+/**
+ * This enumeration lists the different types of dispatches that request
+ * processing can trigger. In this instance, dispatch means re-process this
+ * request using the given socket status.
+ */
+public enum DispatchType {
+
+    NON_BLOCKING_READ(SocketStatus.OPEN_READ),
+    NON_BLOCKING_WRITE(SocketStatus.OPEN_WRITE);
+
+    private final SocketStatus status;
+
+    private DispatchType(SocketStatus status) {
+        this.status = status;
+    }
+
+    public SocketStatus getSocketStatus() {
+        return status;
+    }
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Thu Aug 15 10:32:15 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.tomcat.util.net;
 
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -60,6 +63,8 @@ public class SocketWrapper<E> {
     private final Object writeThreadLock = new Object();
     public Object getWriteThreadLock() { return writeThreadLock; }
 
+    private Set<DispatchType> dispatches = new LinkedHashSet<>();
+
     public SocketWrapper(E socket) {
         this.socket = socket;
         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -108,4 +113,19 @@ public class SocketWrapper<E> {
     public WriteLock getBlockingStatusWriteLock() {
         return blockingStatusWriteLock;
     }
+    public void addDispatch(DispatchType dispatchType) {
+        dispatches.add(dispatchType);
+    }
+    public boolean hasNextDispatch() {
+        return dispatches.size() > 0;
+    }
+    public DispatchType getNextDispatch() {
+        DispatchType result = null;
+        Iterator<DispatchType> iter = dispatches.iterator();
+        if (iter.hasNext()) {
+            result = iter.next();
+            iter.remove();
+        }
+        return result;
+    }
 }

Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Thu Aug 15 10:32:15
2013
@@ -251,14 +251,6 @@ public class TestNonBlockingAPI extends 
     public void testNonBlockingWriteError() throws Exception {
         Tomcat tomcat = getTomcatInstance();
 
-        // Not applicable to BIO. This test does not start a new thread for the
-        // write so with BIO all the writes happen in the service() method just
-        // like blocking IO.
-        if (tomcat.getConnector().getProtocolHandlerClassName().equals(
-                "org.apache.coyote.http11.Http11Protocol")) {
-            return;
-        }
-
         // Must have a real docBase - just use temp
         StandardContext ctx = (StandardContext) tomcat.addContext(
                 "", System.getProperty("java.io.tmpdir"));
@@ -416,10 +408,8 @@ public class TestNonBlockingAPI extends 
             });
             // step 2 - notify on read
             ServletInputStream in = req.getInputStream();
-            listener = new TestReadListener(actx);
+            listener = new TestReadListener(actx, false);
             in.setReadListener(listener);
-
-            listener.onDataAvailable();
         }
     }
 
@@ -462,13 +452,12 @@ public class TestNonBlockingAPI extends 
             });
             // step 2 - notify on read
             ServletInputStream in = req.getInputStream();
-            rlistener = new TestReadListener(actx);
+            rlistener = new TestReadListener(actx, true);
             in.setReadListener(rlistener);
             ServletOutputStream out = resp.getOutputStream();
             resp.setBufferSize(200 * 1024);
             wlistener = new TestWriteListener(actx);
             out.setWriteListener(wlistener);
-            wlistener.onWritePossible();
         }
 
 
@@ -476,9 +465,12 @@ public class TestNonBlockingAPI extends 
     private class TestReadListener implements ReadListener {
         private final AsyncContext ctx;
         private final StringBuilder body = new StringBuilder();
+        private final boolean usingNonBlockingWrite;
 
-        public TestReadListener(AsyncContext ctx) {
+        public TestReadListener(AsyncContext ctx,
+                boolean usingNonBlockingWrite) {
             this.ctx = ctx;
+            this.usingNonBlockingWrite = usingNonBlockingWrite;
         }
 
         @Override
@@ -501,18 +493,22 @@ public class TestNonBlockingAPI extends 
         @Override
         public void onAllDataRead() {
             log.info("onAllDataRead");
-            String msg;
-            if (body.toString().endsWith("FINISHED")) {
-                msg = "OK";
-            } else {
-                msg = "FAILED";
-            }
-            try {
-                ctx.getResponse().getOutputStream().print(msg);
-            } catch (IOException ioe) {
-                // Ignore
+            // If non-blocking writes are being used, don't write here as it
+            // will inject unexpected data into the write output.
+            if (!usingNonBlockingWrite) {
+                String msg;
+                if (body.toString().endsWith("FINISHED")) {
+                    msg = "OK";
+                } else {
+                    msg = "FAILED";
+                }
+                try {
+                    ctx.getResponse().getOutputStream().print(msg);
+                } catch (IOException ioe) {
+                    // Ignore
+                }
+                ctx.complete();
             }
-            ctx.complete();
         }
 
         @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message