hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1629597 - in /httpcomponents/httpcore/trunk/httpcore-nio/src: main/java/org/apache/http/nio/protocol/ test/java/org/apache/http/nio/protocol/
Date Mon, 06 Oct 2014 09:19:36 GMT
Author: olegk
Date: Mon Oct  6 09:19:35 2014
New Revision: 1629597

URL: http://svn.apache.org/r1629597
Log:
Refactored the default and pipelining async exchange handlers

Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncClientExchangeHandler.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/PipeliningClientExchangeHandler.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestBasicAsyncClientExchangeHandler.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncClientExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncClientExchangeHandler.java?rev=1629597&r1=1629596&r2=1629597&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncClientExchangeHandler.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncClientExchangeHandler.java
Mon Oct  6 09:19:35 2014
@@ -29,6 +29,7 @@ package org.apache.http.nio.protocol;
 
 import java.io.IOException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.http.ConnectionClosedException;
 import org.apache.http.ConnectionReuseStrategy;
@@ -63,9 +64,9 @@ public class BasicAsyncClientExchangeHan
     private final NHttpClientConnection conn;
     private final HttpProcessor httppocessor;
     private final ConnectionReuseStrategy connReuseStrategy;
-
-    private volatile boolean requestSent;
-    private volatile boolean keepAlive;
+    private final AtomicBoolean requestSent;
+    private final AtomicBoolean keepAlive;
+    private final AtomicBoolean closed;
 
     /**
      * Creates new instance of BasicAsyncRequestExecutionHandler.
@@ -95,6 +96,9 @@ public class BasicAsyncClientExchangeHan
         this.httppocessor = Args.notNull(httppocessor, "HTTP processor");
         this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
             DefaultConnectionReuseStrategy.INSTANCE;
+        this.requestSent = new AtomicBoolean(false);
+        this.keepAlive = new AtomicBoolean(false);
+        this.closed = new AtomicBoolean(false);
     }
 
     /**
@@ -132,9 +136,11 @@ public class BasicAsyncClientExchangeHan
 
     @Override
     public void close() throws IOException {
-        releaseResources();
-        if (!this.future.isDone()) {
-            this.future.cancel();
+        if (this.closed.compareAndSet(false, true)) {
+            releaseResources();
+            if (!this.future.isDone()) {
+                this.future.cancel();
+            }
         }
     }
 
@@ -159,7 +165,7 @@ public class BasicAsyncClientExchangeHan
     @Override
     public void requestCompleted() {
         this.requestProducer.requestCompleted(this.localContext);
-        this.requestSent = true;
+        this.requestSent.set(true);
     }
 
     @Override
@@ -167,7 +173,7 @@ public class BasicAsyncClientExchangeHan
         this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
         this.httppocessor.process(response, this.localContext);
         this.responseConsumer.responseReceived(response);
-        this.keepAlive = this.connReuseStrategy.keepAlive(response, this.localContext);
+        this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext));
     }
 
     @Override
@@ -179,7 +185,7 @@ public class BasicAsyncClientExchangeHan
     @Override
     public void responseCompleted() throws IOException {
         try {
-            if (!this.keepAlive) {
+            if (!this.keepAlive.get()) {
                 this.conn.close();
             }
             this.responseConsumer.responseCompleted(this.localContext);
@@ -190,7 +196,9 @@ public class BasicAsyncClientExchangeHan
             } else {
                 this.future.failed(ex);
             }
-            releaseResources();
+            if (this.closed.compareAndSet(false, true)) {
+                releaseResources();
+            }
         } catch (final RuntimeException ex) {
             failed(ex);
             throw ex;
@@ -204,31 +212,33 @@ public class BasicAsyncClientExchangeHan
 
     @Override
     public void failed(final Exception ex) {
-        try {
-            if (!this.requestSent) {
-                this.requestProducer.failed(ex);
-            }
-            this.responseConsumer.failed(ex);
-        } finally {
+        if (this.closed.compareAndSet(false, true)) {
             try {
-                this.future.failed(ex);
+                if (!this.requestSent.get()) {
+                    this.requestProducer.failed(ex);
+                }
+                this.responseConsumer.failed(ex);
             } finally {
-                releaseResources();
+                try {
+                    this.future.failed(ex);
+                } finally {
+                    releaseResources();
+                }
             }
         }
     }
 
     @Override
     public boolean cancel() {
-        try {
-            final boolean cancelled = this.responseConsumer.cancel();
-            this.future.cancel();
-            releaseResources();
-            return cancelled;
-        } catch (final RuntimeException ex) {
-            failed(ex);
-            throw ex;
+        if (this.closed.compareAndSet(false, true)) {
+            try {
+                this.future.cancel();
+                return this.responseConsumer.cancel();
+            } finally {
+                releaseResources();
+            }
         }
+        return false;
     }
 
     @Override

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/PipeliningClientExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/PipeliningClientExchangeHandler.java?rev=1629597&r1=1629596&r2=1629597&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/PipeliningClientExchangeHandler.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/PipeliningClientExchangeHandler.java
Mon Oct  6 09:19:35 2014
@@ -34,6 +34,8 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.http.ConnectionClosedException;
 import org.apache.http.ConnectionReuseStrategy;
@@ -54,8 +56,8 @@ import org.apache.http.util.Args;
 import org.apache.http.util.Asserts;
 
 /**
- * Basic implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}
that executes
- * a single HTTP request / response exchange.
+ * Pipelining implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}
+ * that executes a series of pipelined HTTP requests.
  *
  * @param <T> the result type of request execution.
  * @since 4.4
@@ -73,10 +75,10 @@ public class PipeliningClientExchangeHan
     private final HttpProcessor httppocessor;
     private final ConnectionReuseStrategy connReuseStrategy;
 
-    private volatile HttpAsyncRequestProducer requestProducer;
-    private volatile HttpAsyncResponseConsumer<T> responseConsumer;
-    private volatile boolean keepAlive;
-    private volatile boolean done;
+    private final AtomicReference<HttpAsyncRequestProducer> requestProducerRef;
+    private final AtomicReference<HttpAsyncResponseConsumer<T>> responseConsumerRef;
+    private final AtomicBoolean keepAlive;
+    private final AtomicBoolean closed;
 
     /**
      * Creates new instance of {@code PipeliningClientExchangeHandler}.
@@ -113,6 +115,10 @@ public class PipeliningClientExchangeHan
         this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
             DefaultConnectionReuseStrategy.INSTANCE;
         this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn);
+        this.requestProducerRef = new AtomicReference<HttpAsyncRequestProducer>(null);
+        this.responseConsumerRef = new AtomicReference<HttpAsyncResponseConsumer<T>>(null);
+        this.keepAlive = new AtomicBoolean(false);
+        this.closed = new AtomicBoolean(false);
     }
 
     /**
@@ -147,10 +153,8 @@ public class PipeliningClientExchangeHan
     }
 
     private void releaseResources() {
-        closeQuietly(this.requestProducer);
-        this.requestProducer = null;
-        closeQuietly(this.responseConsumer);
-        this.responseConsumer = null;
+        closeQuietly(this.requestProducerRef.getAndSet(null));
+        closeQuietly(this.responseConsumerRef.getAndSet(null));
         while (!this.requestProducerQueue.isEmpty()) {
             closeQuietly(this.requestProducerQueue.remove());
         }
@@ -163,20 +167,23 @@ public class PipeliningClientExchangeHan
 
     @Override
     public void close() throws IOException {
-        releaseResources();
-        if (!this.future.isDone()) {
-            this.future.cancel();
+        if (this.closed.compareAndSet(false, true)) {
+            releaseResources();
+            if (!this.future.isDone()) {
+                this.future.cancel();
+            }
         }
     }
 
     @Override
     public HttpRequest generateRequest() throws IOException, HttpException {
-        Asserts.check(this.requestProducer == null, "Inconsistent state: request producer
is not null");
-        this.requestProducer = this.requestProducerQueue.poll();
-        if (this.requestProducer == null) {
+        Asserts.check(this.requestProducerRef.get() == null, "Inconsistent state: request
producer is not null");
+        final HttpAsyncRequestProducer requestProducer = this.requestProducerQueue.poll();
+        if (requestProducer == null) {
             return null;
         }
-        final HttpRequest request = this.requestProducer.generateRequest();
+        this.requestProducerRef.set(requestProducer);
+        final HttpRequest request = requestProducer.generateRequest();
         this.httppocessor.process(request, this.localContext);
         this.requestQueue.add(request);
         return request;
@@ -185,23 +192,25 @@ public class PipeliningClientExchangeHan
     @Override
     public void produceContent(
             final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
-        Asserts.check(this.requestProducer != null, "Inconsistent state: request producer
is null");
-        this.requestProducer.produceContent(encoder, ioctrl);
+        final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
+        Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
+        requestProducer.produceContent(encoder, ioctrl);
     }
 
     @Override
     public void requestCompleted() {
-        Asserts.check(this.requestProducer != null, "Inconsistent state: request producer
is null");
-        this.requestProducer.requestCompleted(this.localContext);
-        this.requestProducer = null;
+        final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.getAndSet(null);
+        Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
+        requestProducer.requestCompleted(this.localContext);
     }
 
     @Override
     public void responseReceived(final HttpResponse response) throws IOException, HttpException
{
-        Asserts.check(this.responseConsumer == null, "Inconsistent state: response consumer
is not null");
+        Asserts.check(this.responseConsumerRef.get() == null, "Inconsistent state: response
consumer is not null");
 
-        this.responseConsumer = this.responseConsumerQueue.poll();
-        Asserts.check(this.responseConsumer != null, "Inconsistent state: response consumer
queue is empty");
+        final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerQueue.poll();
+        Asserts.check(responseConsumer != null, "Inconsistent state: response consumer queue
is empty");
+        this.responseConsumerRef.set(responseConsumer);
 
         final HttpRequest request = this.requestQueue.poll();
         Asserts.check(request != null, "Inconsistent state: request queue is empty");
@@ -210,28 +219,29 @@ public class PipeliningClientExchangeHan
         this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
         this.httppocessor.process(response, this.localContext);
 
-        this.responseConsumer.responseReceived(response);
-        this.keepAlive = this.connReuseStrategy.keepAlive(response, this.localContext);
+        responseConsumer.responseReceived(response);
+        this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext));
     }
 
     @Override
     public void consumeContent(
             final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
-        Asserts.check(this.responseConsumer != null, "Inconsistent state: response consumer
is null");
-        this.responseConsumer.consumeContent(decoder, ioctrl);
+        final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
+        Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is
null");
+        responseConsumer.consumeContent(decoder, ioctrl);
     }
 
     @Override
     public void responseCompleted() throws IOException {
-        Asserts.check(this.responseConsumer != null, "Inconsistent state: response consumer
is null");
+        final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.getAndSet(null);
+        Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is
null");
         try {
-            if (!this.keepAlive) {
+            if (!this.keepAlive.get()) {
                 this.conn.close();
             }
-            this.responseConsumer.responseCompleted(this.localContext);
-            final T result = this.responseConsumer.getResult();
-            final Exception ex = this.responseConsumer.getException();
-            this.responseConsumer = null;
+            responseConsumer.responseCompleted(this.localContext);
+            final T result = responseConsumer.getResult();
+            final Exception ex = responseConsumer.getException();
             if (result != null) {
                 this.resultQueue.add(result);
             } else {
@@ -239,8 +249,9 @@ public class PipeliningClientExchangeHan
                 this.conn.shutdown();
             }
             if (!conn.isOpen()) {
-                this.done = true;
-                releaseResources();
+                if (this.closed.compareAndSet(false, true)) {
+                    releaseResources();
+                }
             }
             if (!this.future.isDone() && this.responseConsumerQueue.isEmpty()) {
                 this.future.completed(new ArrayList<T>(this.resultQueue));
@@ -259,40 +270,43 @@ public class PipeliningClientExchangeHan
 
     @Override
     public void failed(final Exception ex) {
-        this.done = true;
-        try {
-            if (this.requestProducer != null) {
-                this.requestProducer.failed(ex);
-            }
-            if (this.responseConsumer != null) {
-                this.responseConsumer.failed(ex);
-            }
-        } finally {
+        if (this.closed.compareAndSet(false, true)) {
             try {
-                this.future.failed(ex);
+                final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
+                if (requestProducer != null) {
+                    requestProducer.failed(ex);
+                }
+                final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
+                if (responseConsumer != null) {
+                    responseConsumer.failed(ex);
+                }
             } finally {
-                releaseResources();
+                try {
+                    this.future.failed(ex);
+                } finally {
+                    releaseResources();
+                }
             }
         }
     }
 
     @Override
     public boolean cancel() {
-        this.done = true;
-        try {
-            final boolean cancelled = this.responseConsumer.cancel();
-            this.future.cancel();
-            releaseResources();
-            return cancelled;
-        } catch (final RuntimeException ex) {
-            failed(ex);
-            throw ex;
+        if (this.closed.compareAndSet(false, true)) {
+            try {
+                this.future.cancel();
+                final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
+                return responseConsumer != null && responseConsumer.cancel();
+            } finally {
+                releaseResources();
+            }
         }
+        return false;
     }
 
     @Override
     public boolean isDone() {
-        return this.done;
+        return this.future.isDone();
     }
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestBasicAsyncClientExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestBasicAsyncClientExchangeHandler.java?rev=1629597&r1=1629596&r2=1629597&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestBasicAsyncClientExchangeHandler.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestBasicAsyncClientExchangeHandler.java
Mon Oct  6 09:19:35 2014
@@ -29,8 +29,6 @@ package org.apache.http.nio.protocol;
 
 import java.util.concurrent.ExecutionException;
 
-import org.junit.Assert;
-
 import org.apache.http.ConnectionClosedException;
 import org.apache.http.ConnectionReuseStrategy;
 import org.apache.http.HttpRequest;
@@ -45,6 +43,7 @@ import org.apache.http.protocol.HttpCont
 import org.apache.http.protocol.HttpCoreContext;
 import org.apache.http.protocol.HttpProcessor;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -279,23 +278,6 @@ public class TestBasicAsyncClientExchang
     }
 
     @Test
-    public void testCancelWithException() throws Exception {
-        Mockito.doThrow(new RuntimeException()).when(this.responseConsumer).cancel();
-        try {
-            this.exchangeHandler.cancel();
-            Assert.fail("RuntimeException expected");
-        } catch (final RuntimeException ex) {
-            Mockito.verify(this.requestProducer).close();
-            Mockito.verify(this.responseConsumer).close();
-            try {
-                this.exchangeHandler.getFuture().get();
-                Assert.fail("ExecutionException expected");
-            } catch (final ExecutionException exex) {
-            }
-        }
-    }
-
-    @Test
     public void testResponseCompleted() throws Exception {
         final Object obj = new Object();
         Mockito.when(this.responseConsumer.getResult()).thenReturn(obj);



Mime
View raw message