hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1234543 - in /httpcomponents/httpcore/trunk/httpcore-nio/src: main/java/org/apache/http/nio/protocol/ test/java/org/apache/http/nio/integration/ test/java/org/apache/http/nio/protocol/
Date Sun, 22 Jan 2012 16:27:32 GMT
Author: olegk
Date: Sun Jan 22 16:27:32 2012
New Revision: 1234543

URL: http://svn.apache.org/viewvc?rev=1234543&view=rev
Log:
HTTPCORE-291: fixed out of sequence responses to pipelined requests

Added:
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/integration/TestPipelining.java
  (with props)
Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java?rev=1234543&r1=1234542&r2=1234543&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java
Sun Jan 22 16:27:32 2012
@@ -190,13 +190,19 @@ public class HttpAsyncService implements
         State state = ensureNotNull(getState(conn));
         if (state != null) {
             synchronized (state) {
+                state.setTerminated();
                 closeHandlers(state, cause);
+                Cancellable cancellable = state.getCancellable();
+                if (cancellable != null) {
+                    cancellable.cancel();
+                }
                 if (cause instanceof HttpException) {
                     if (conn.isResponseSubmitted()
-                            || state.getResponseState() != MessageState.READY) {
+                            || state.getResponseState().compareTo(MessageState.INIT) >
0) {
                         // There is not much that we can do if a response
                         // has already been submitted
                         closeConnection(conn);
+                        log(cause);
                     } else {
                         HttpContext context = state.getContext();
                         HttpAsyncResponseProducer responseProducer = handleException(
@@ -209,7 +215,6 @@ public class HttpAsyncService implements
                         } catch (Exception ex) {
                             shutdownConnection(conn);
                             closeHandlers(state);
-                            state.reset();
                             if (ex instanceof RuntimeException) {
                                 throw (RuntimeException) ex;
                             } else {
@@ -219,7 +224,6 @@ public class HttpAsyncService implements
                     }
                 } else {
                     shutdownConnection(conn);
-                    state.reset();
                 }
             }
         } else {
@@ -509,6 +513,7 @@ public class HttpAsyncService implements
         HttpAsyncRequestConsumer<?> consumer = state.getRequestConsumer();
         consumer.requestCompleted(context);
         state.setRequestState(MessageState.COMPLETED);
+        state.setResponseState(MessageState.INIT);
         Exception exception = consumer.getException();
         if (exception != null) {
             HttpAsyncResponseProducer responseProducer = handleException(exception, context);

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java?rev=1234543&r1=1234542&r2=1234543&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
Sun Jan 22 16:27:32 2012
@@ -28,6 +28,6 @@ package org.apache.http.nio.protocol;
 
 enum MessageState {
 
-    READY, ACK_EXPECTED, ACK, BODY_STREAM, COMPLETED
+    READY, INIT, ACK_EXPECTED, ACK, BODY_STREAM, COMPLETED
 
 }

Added: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/integration/TestPipelining.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/integration/TestPipelining.java?rev=1234543&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/integration/TestPipelining.java
(added)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/integration/TestPipelining.java
Sun Jan 22 16:27:32 2012
@@ -0,0 +1,267 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.nio.integration;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.http.HttpCoreNIOTestBase;
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseInterceptor;
+import org.apache.http.LoggingClientConnectionFactory;
+import org.apache.http.LoggingServerConnectionFactory;
+import org.apache.http.concurrent.Cancellable;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.nio.DefaultNHttpClientConnection;
+import org.apache.http.impl.nio.DefaultNHttpServerConnection;
+import org.apache.http.nio.NHttpConnectionFactory;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
+import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
+import org.apache.http.nio.protocol.HttpAsyncExchange;
+import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
+import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
+import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
+import org.apache.http.nio.protocol.HttpAsyncService;
+import org.apache.http.nio.reactor.IOReactorStatus;
+import org.apache.http.nio.reactor.ListenerEndpoint;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.HttpRequestHandler;
+import org.apache.http.protocol.ImmutableHttpProcessor;
+import org.apache.http.protocol.ResponseConnControl;
+import org.apache.http.protocol.ResponseContent;
+import org.apache.http.protocol.ResponseServer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for handling pipelined requests.
+ */
+public class TestPipelining extends HttpCoreNIOTestBase {
+
+    @Before
+    public void setUp() throws Exception {
+        initServer();
+        this.serverHttpProc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
+                new ResponseServer(),
+                new ResponseContent(),
+                new ResponseConnControl()
+        });
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        shutDownServer();
+    }
+
+    @Override
+    protected NHttpConnectionFactory<DefaultNHttpServerConnection> createServerConnectionFactory(
+            final HttpParams params) throws Exception {
+        return new LoggingServerConnectionFactory(params);
+    }
+
+    @Override
+    protected NHttpConnectionFactory<DefaultNHttpClientConnection> createClientConnectionFactory(
+            final HttpParams params) throws Exception {
+        return new LoggingClientConnectionFactory(params);
+    }
+
+    @Test
+    public void testBasicPipelining() throws Exception {
+        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
+        registry.register("*", new BasicAsyncRequestHandler(new HttpRequestHandler() {
+
+            public void handle(
+                    final HttpRequest request,
+                    final HttpResponse response,
+                    final HttpContext context) throws HttpException, IOException {
+                String content = "thank you very much";
+                NStringEntity entity = NStringEntity.create(content, ContentType.DEFAULT_TEXT);
+                response.setEntity(entity);
+            }
+            
+        }));
+        HttpAsyncService serviceHandler = new HttpAsyncService(
+                this.serverHttpProc,
+                new DefaultConnectionReuseStrategy(),
+                registry,
+                this.serverParams);
+        this.server.start(serviceHandler);
+
+        ListenerEndpoint endpoint = this.server.getListenerEndpoint();
+        endpoint.waitFor();
+
+        Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
+        
+        InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
+        Socket socket = new Socket("localhost", address.getPort());
+        try {
+            OutputStream outstream = socket.getOutputStream();
+            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outstream,
"US-ASCII"));
+            writer.write("GET / HTTP/1.1\r\n");
+            writer.write("Host: localhost\r\n");
+            writer.write("\r\n");
+            writer.write("GET / HTTP/1.1\r\n");
+            writer.write("Host: localhost\r\n");
+            writer.write("Connection: close\r\n");
+            writer.write("\r\n");
+            writer.flush();
+            InputStream instream = socket.getInputStream();
+            BufferedReader reader = new BufferedReader(new InputStreamReader(instream, "US-ASCII"));
+            StringBuilder buf = new StringBuilder();
+            char[] tmp = new char[1024];
+            int l;
+            while ((l = reader.read(tmp)) != -1) {
+                buf.append(tmp, 0, l);
+            }
+            reader.close();
+            writer.close();
+            String expected = 
+                    "HTTP/1.1 200 OK\r\n" +
+                    "Server: TEST-SERVER/1.1\r\n" +
+                    "Content-Length: 19\r\n" +
+                    "Content-Type: text/plain; charset=ISO-8859-1\r\n" +
+                    "\r\n" +
+                    "thank you very much" +
+                    "HTTP/1.1 200 OK\r\n" +
+                    "Server: TEST-SERVER/1.1\r\n" +
+                    "Content-Length: 19\r\n" +
+                    "Content-Type: text/plain; charset=ISO-8859-1\r\n" +
+                    "Connection: close\r\n" +
+                    "\r\n" +
+                    "thank you very much";
+            Assert.assertEquals(expected, buf.toString());
+            
+        } finally {
+            socket.close();
+        }
+        
+    }
+
+    @Test
+    public void testPipeliningWithCancellable() throws Exception {
+        
+        final Cancellable cancellable = Mockito.mock(Cancellable.class);
+        
+        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
+        registry.register("/long", new HttpAsyncRequestHandler<HttpRequest>() {
+
+            public HttpAsyncRequestConsumer<HttpRequest> processRequest(final HttpRequest
request,
+                    final HttpContext context) {
+                return new BasicAsyncRequestConsumer();
+            }
+
+            public void handle(
+                    final HttpRequest request,
+                    final HttpAsyncExchange httpexchange,
+                    final HttpContext context) throws HttpException, IOException {
+                httpexchange.setCallback(cancellable);
+                // do not submit a response;
+            }
+            
+        });
+        registry.register("/short", new BasicAsyncRequestHandler(new HttpRequestHandler()
{
+
+            public void handle(
+                    final HttpRequest request,
+                    final HttpResponse response,
+                    final HttpContext context) throws HttpException, IOException {
+                String content = "thank you very much";
+                NStringEntity entity = NStringEntity.create(content, ContentType.DEFAULT_TEXT);
+                response.setEntity(entity);
+            }
+            
+        }));
+        HttpAsyncService serviceHandler = new HttpAsyncService(
+                this.serverHttpProc,
+                new DefaultConnectionReuseStrategy(),
+                registry,
+                this.serverParams);
+        this.server.start(serviceHandler);
+
+        ListenerEndpoint endpoint = this.server.getListenerEndpoint();
+        endpoint.waitFor();
+
+        Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
+        
+        InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
+        Socket socket = new Socket("localhost", address.getPort());
+        try {
+            OutputStream outstream = socket.getOutputStream();
+            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outstream,
"US-ASCII"));
+            writer.write("GET /long HTTP/1.1\r\n");
+            writer.write("Host: localhost\r\n");
+            writer.write("\r\n");
+            writer.write("GET /short HTTP/1.1\r\n");
+            writer.write("Host: localhost\r\n");
+            writer.write("Connection: close\r\n");
+            writer.write("\r\n");
+            writer.flush();
+            InputStream instream = socket.getInputStream();
+            BufferedReader reader = new BufferedReader(new InputStreamReader(instream, "US-ASCII"));
+            StringBuilder buf = new StringBuilder();
+            char[] tmp = new char[1024];
+            int l;
+            while ((l = reader.read(tmp)) != -1) {
+                buf.append(tmp, 0, l);
+            }
+            reader.close();
+            writer.close();
+            
+            String expected = 
+                    "HTTP/1.1 400 Bad Request\r\n" +
+                    "Connection: Close\r\n" +
+                    "Server: TEST-SERVER/1.1\r\n" +
+                    "Content-Length: 70\r\n" +
+                    "Content-Type: text/plain; charset=ISO-8859-1\r\n" +
+                    "\r\n" +
+                    "Out of sequence request message detected (pipelining is not supported)";
+            Assert.assertEquals(expected, buf.toString());
+            
+        } finally {
+            socket.close();
+        }
+        
+        Mockito.verify(cancellable).cancel();
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/integration/TestPipelining.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/integration/TestPipelining.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/integration/TestPipelining.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java?rev=1234543&r1=1234542&r2=1234543&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncService.java
Sun Jan 22 16:27:32 2012
@@ -169,6 +169,7 @@ public class TestHttpAsyncService {
         state.setRequestState(MessageState.READY);
         state.setResponseState(MessageState.READY);
         state.setRequestConsumer(this.requestConsumer);
+        state.setCancellable(this.cancellable);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
         HttpException httpex = new HttpException();
@@ -182,6 +183,7 @@ public class TestHttpAsyncService {
 
         Mockito.verify(this.requestConsumer).failed(httpex);
         Mockito.verify(this.requestConsumer).close();
+        Mockito.verify(this.cancellable).cancel();
         Mockito.verify(this.conn, Mockito.never()).shutdown();
         Mockito.verify(this.conn, Mockito.never()).close();
     }
@@ -194,6 +196,7 @@ public class TestHttpAsyncService {
         state.setResponseState(MessageState.READY);
         state.setRequestConsumer(this.requestConsumer);
         state.setResponseProducer(this.responseProducer);
+        state.setCancellable(this.cancellable);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
         Mockito.doThrow(new RuntimeException()).when(this.httpProcessor).process(
@@ -208,6 +211,7 @@ public class TestHttpAsyncService {
             Mockito.verify(this.requestConsumer, Mockito.atLeastOnce()).close();
             Mockito.verify(this.responseProducer).failed(httpex);
             Mockito.verify(this.responseProducer, Mockito.atLeastOnce()).close();
+            Mockito.verify(this.cancellable).cancel();
         }
     }
 
@@ -219,6 +223,7 @@ public class TestHttpAsyncService {
         state.setResponseState(MessageState.READY);
         state.setRequestConsumer(this.requestConsumer);
         state.setResponseProducer(this.responseProducer);
+        state.setCancellable(this.cancellable);
         this.connContext.setAttribute(HttpAsyncService.HTTP_EXCHANGE_STATE, state);
 
         Mockito.doThrow(new IOException()).when(this.httpProcessor).process(
@@ -232,6 +237,7 @@ public class TestHttpAsyncService {
         Mockito.verify(this.requestConsumer, Mockito.atLeastOnce()).close();
         Mockito.verify(this.responseProducer).failed(httpex);
         Mockito.verify(this.responseProducer, Mockito.atLeastOnce()).close();
+        Mockito.verify(this.cancellable).cancel();
     }
 
     @Test
@@ -294,7 +300,7 @@ public class TestHttpAsyncService {
         this.protocolHandler.requestReceived(this.conn);
 
         Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
-        Assert.assertEquals(MessageState.READY, state.getResponseState());
+        Assert.assertEquals(MessageState.INIT, state.getResponseState());
 
         Assert.assertSame(request, state.getRequest());
         Assert.assertSame(this.requestHandler, state.getRequestHandler());
@@ -546,7 +552,7 @@ public class TestHttpAsyncService {
         this.protocolHandler.inputReady(conn, this.decoder);
 
         Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
-        Assert.assertEquals(MessageState.READY, state.getResponseState());
+        Assert.assertEquals(MessageState.INIT, state.getResponseState());
 
         Mockito.verify(this.requestConsumer).consumeContent(this.decoder, this.conn);
         Mockito.verify(this.conn).suspendInput();
@@ -575,7 +581,7 @@ public class TestHttpAsyncService {
         this.protocolHandler.inputReady(conn, this.decoder);
 
         Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
-        Assert.assertEquals(MessageState.READY, state.getResponseState());
+        Assert.assertEquals(MessageState.INIT, state.getResponseState());
         Assert.assertNotNull(state.getResponseProducer());
 
         Mockito.verify(this.requestConsumer).consumeContent(this.decoder, this.conn);
@@ -612,7 +618,7 @@ public class TestHttpAsyncService {
         this.protocolHandler.inputReady(conn, this.decoder);
 
         Assert.assertEquals(MessageState.COMPLETED, state.getRequestState());
-        Assert.assertEquals(MessageState.READY, state.getResponseState());
+        Assert.assertEquals(MessageState.INIT, state.getResponseState());
         Assert.assertNotNull(state.getResponseProducer());
 
         Mockito.verify(this.requestConsumer).consumeContent(this.decoder, this.conn);



Mime
View raw message