hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r549881 - in /jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol: ThrottlingHttpClientHandler.java ThrottlingHttpServiceHandler.java
Date Fri, 22 Jun 2007 17:04:09 GMT
Author: olegk
Date: Fri Jun 22 10:04:08 2007
New Revision: 549881

URL: http://svn.apache.org/viewvc?view=rev&rev=549881
Log:
HTTPCORE-83: Introduced additional safeguards to ensure only one worker thread can access
the shared buffer at a time (throttling handlers may spawn a new worker thread before the
previous one terminates in order to avoid blocking the I/O dispatcher)

Modified:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpClientHandler.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpClientHandler.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpClientHandler.java?view=diff&rev=549881&r1=549880&r2=549881
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpClientHandler.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpClientHandler.java
Fri Jun 22 10:04:08 2007
@@ -403,12 +403,38 @@
                 public void run() {
                     try {
 
+                        // Block until previous request is fully processed and 
+                        // the worker thread no longer holds the shared buffer
+                        synchronized (connState) {
+                            try {
+                                for (;;) {
+                                    int currentState = connState.getOutputState();
+                                    if (!connState.isWorkerRunning()) {
+                                        break;
+                                    }
+                                    if (currentState == ServerConnState.SHUTDOWN) {
+                                        throw new InterruptedIOException("Service interrupted");
+                                    }
+                                    connState.wait();
+                                }
+                            } catch (InterruptedException ex) {
+                                connState.shutdown();
+                                return;
+                            }
+                            connState.setWorkerRunning(true);
+                        }
+                        
                         HttpEntity entity = request.getEntity();
                         OutputStream outstream = new ContentOutputStream(
                                 connState.getOutbuffer());
                         entity.writeTo(outstream);
                         outstream.flush();
                         outstream.close();
+
+                        synchronized (connState) {
+                            connState.setWorkerRunning(false);
+                            connState.notifyAll();
+                        }
                         
                     } catch (IOException ex) {
                         shutdownConnection(conn, ex);
@@ -434,6 +460,27 @@
             public void run() {
                 try {
 
+                    // Block until previous request is fully processed and 
+                    // the worker thread no longer holds the shared buffer
+                    synchronized (connState) {
+                        try {
+                            for (;;) {
+                                int currentState = connState.getOutputState();
+                                if (!connState.isWorkerRunning()) {
+                                    break;
+                                }
+                                if (currentState == ServerConnState.SHUTDOWN) {
+                                    throw new InterruptedIOException("Service interrupted");
+                                }
+                                connState.wait();
+                            }
+                        } catch (InterruptedException ex) {
+                            connState.shutdown();
+                            return;
+                        }
+                        connState.setWorkerRunning(true);
+                    }
+                    
                     execHandler.handleResponse(response, context);
                     
                     synchronized (connState) {
@@ -458,6 +505,8 @@
                         if (conn.isOpen()) {
                             conn.requestOutput();
                         }
+                        connState.setWorkerRunning(false);
+                        connState.notifyAll();
                     }
                     
                 } catch (IOException ex) {
@@ -500,13 +549,15 @@
         private final SharedInputBuffer inbuffer; 
         private final SharedOutputBuffer outbuffer;
 
-        private int inputState;
-        private int outputState;
+        private volatile int inputState;
+        private volatile int outputState;
         
-        private HttpRequest request;
-        private HttpResponse response;
+        private volatile HttpRequest request;
+        private volatile HttpResponse response;
+
+        private volatile int timeout;
 
-        private int timeout;
+        private volatile boolean workerRunning; 
         
         public ClientConnState(
                 int bufsize, 
@@ -567,6 +618,14 @@
             this.timeout = timeout;
         }
             
+        public boolean isWorkerRunning() {
+            return this.workerRunning;
+        }
+
+        public void setWorkerRunning(boolean b) {
+            this.workerRunning = b;
+        }
+
         public void shutdown() {
             this.inbuffer.shutdown();
             this.outbuffer.shutdown();

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java?view=diff&rev=549881&r1=549880&r2=549881
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java
Fri Jun 22 10:04:08 2007
@@ -175,8 +175,8 @@
         final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
 
         synchronized (connState) {
-            connState.setRequest(request);
             connState.setInputState(ServerConnState.REQUEST_RECEIVED);
+            connState.setRequest(request);
 
             boolean contentExpected = false;
             if (request instanceof HttpEntityEnclosingRequest) {
@@ -266,6 +266,9 @@
                     if (statusCode >= 200 && entity == null) {
                         connState.resetOutput();
                         connState.resetInput();
+                        if (!connState.isWorkerRunning()) {
+                            conn.requestInput();
+                        }
 
                         if (!this.connStrategy.keepAlive(response, context)) {
                             conn.close();
@@ -307,6 +310,10 @@
                     connState.resetOutput();
                     connState.resetInput();
 
+                    if (!connState.isWorkerRunning()) {
+                        conn.requestInput();
+                    }
+
                     if (!this.connStrategy.keepAlive(response, context)) {
                         conn.close();
                     }
@@ -325,27 +332,6 @@
         }
     }
  
-    private void waitForOutputState(
-            final ServerConnState connState, 
-            int expectedState) throws InterruptedIOException {
-        synchronized (connState) {
-            try {
-                for (;;) {
-                    int currentState = connState.getOutputState();
-                    if (currentState == expectedState) {
-                        break;
-                    }
-                    if (currentState == ServerConnState.SHUTDOWN) {
-                        throw new InterruptedIOException("Service interrupted");
-                    }
-                    connState.wait();
-                }
-            } catch (InterruptedException ex) {
-                connState.shutdown();
-            }
-        }
-    }
-    
     private void handleException(final HttpException ex, final HttpResponse response) {
         if (ex instanceof MethodNotSupportedException) {
             response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
@@ -366,13 +352,32 @@
             final ServerConnState connState,
             final NHttpServerConnection conn) throws HttpException, IOException {
 
-        waitForOutputState(connState, ServerConnState.READY);
-
         HttpContext context = conn.getContext();
-        HttpRequest request = connState.getRequest();
-        
-        HttpParamsLinker.link(request, this.params);
-        
+        HttpRequest request = null;
+
+        // Block until previous request is fully processed and 
+        // the worker thread no longer holds the shared buffer
+        synchronized (connState) {
+            try {
+                for (;;) {
+                    int currentState = connState.getOutputState();
+                    if (!connState.isWorkerRunning()) {
+                        break;
+                    }
+                    if (currentState == ServerConnState.SHUTDOWN) {
+                        throw new InterruptedIOException("Service interrupted");
+                    }
+                    connState.wait();
+                }
+            } catch (InterruptedException ex) {
+                connState.shutdown();
+                return;
+            }
+            connState.setWorkerRunning(true);
+            request = connState.getRequest();
+            HttpParamsLinker.link(request, this.params);
+        }
+
         context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
         context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
 
@@ -386,9 +391,9 @@
         HttpResponse response = null;
 
         if (request instanceof HttpEntityEnclosingRequest) {
-            HttpEntityEnclosingRequest entityReq = (HttpEntityEnclosingRequest) request;
+            HttpEntityEnclosingRequest eeRequest = (HttpEntityEnclosingRequest) request;
             
-            if (entityReq.expectContinue()) {
+            if (eeRequest.expectContinue()) {
                 response = this.responseFactory.newHttpResponse(
                         ver, 
                         HttpStatus.SC_CONTINUE, 
@@ -412,24 +417,41 @@
                     synchronized (connState) {
                         connState.setResponse(response);
                         conn.requestOutput();
-                        waitForOutputState(connState, ServerConnState.RESPONSE_SENT);
+                        
+                        // Block until 1xx response is sent to the client
+                        try {
+                            for (;;) {
+                                int currentState = connState.getOutputState();
+                                if (currentState == ServerConnState.RESPONSE_SENT) {
+                                    break;
+                                }
+                                if (currentState == ServerConnState.SHUTDOWN) {
+                                    throw new InterruptedIOException("Service interrupted");
+                                }
+                                connState.wait();
+                            }
+                        } catch (InterruptedException ex) {
+                            connState.shutdown();
+                            return;
+                        }
                         connState.resetOutput();
+                        response = null;
                     }
-                    response = null;
                 } else {
-                    // Discard entity
+                    // Discard request entity
                     conn.resetInput();
-                    entityReq.setEntity(null);
+                    eeRequest.setEntity(null);
                 }
 
             }
 
             // Create a wrapper entity instead of the original one
-            if (entityReq.getEntity() != null) {
-                entityReq.setEntity(new ContentBufferEntity(
-                        entityReq.getEntity(), 
+            if (eeRequest.getEntity() != null) {
+                eeRequest.setEntity(new ContentBufferEntity(
+                        eeRequest.getEntity(), 
                         connState.getInbuffer()));
             }
+            
         }
 
         if (response == null) {
@@ -483,8 +505,13 @@
             outstream.flush();
             outstream.close();
         }
-        if (conn.isOpen()) {
-            conn.requestInput();
+        
+        synchronized (connState) {
+            if (connState.getOutputState() == ServerConnState.READY && conn.isOpen())
{
+                conn.requestInput();
+            }
+            connState.setWorkerRunning(false);
+            connState.notifyAll();
         }
     }
     
@@ -509,7 +536,6 @@
         public static final int REQUEST_BODY_DONE          = 4;
         public static final int RESPONSE_SENT              = 8;
         public static final int RESPONSE_BODY_STREAM       = 16;
-        public static final int RESPONSE_BODY_DONE         = 32;
         
         private final SharedInputBuffer inbuffer; 
         private final SharedOutputBuffer outbuffer;
@@ -520,6 +546,8 @@
         private volatile HttpRequest request;
         private volatile HttpResponse response;
         
+        private volatile boolean workerRunning; 
+        
         public ServerConnState(
                 int bufsize, 
                 final IOControl ioControl, 
@@ -569,6 +597,14 @@
 
         public void setResponse(final HttpResponse response) {
             this.response = response;
+        }
+
+        public boolean isWorkerRunning() {
+            return this.workerRunning;
+        }
+
+        public void setWorkerRunning(boolean b) {
+            this.workerRunning = b;
         }
 
         public void shutdown() {



Mime
View raw message