hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1531544 - in /httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client: DefaultClientExchangeHandlerImpl.java MinimalClientExchangeHandlerImpl.java
Date Sat, 12 Oct 2013 15:23:55 GMT
Author: olegk
Date: Sat Oct 12 15:23:55 2013
New Revision: 1531544

URL: http://svn.apache.org/r1531544
Log:
HTTPASYNC-56: Removed method level synchronization in MinimalClientExchangeHandlerImpl and
DefaultClientExchangeHandlerImpl. Instances of both classes are expected to be accessed by
one thread at a time. Only #cancel method can be used safely by multiple threads concurrently.

Modified:
    httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
    httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java

Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java?rev=1531544&r1=1531543&r2=1531544&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
(original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
Sat Oct 12 15:23:55 2013
@@ -28,6 +28,8 @@ package org.apache.http.impl.nio.client;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.http.ConnectionClosedException;
@@ -54,6 +56,12 @@ import org.apache.http.nio.protocol.Http
 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
 import org.apache.http.protocol.HttpCoreContext;
 
+/**
+ * Default implementation of {@link HttpAsyncClientExchangeHandler}.
+ * <p/>
+ * Instances of this class are expected to be accessed by one thread at a time only.
+ * The {@link #cancel()} method can be called concurrently by multiple threads.
+ */
 class DefaultClientExchangeHandlerImpl<T>
     implements HttpAsyncClientExchangeHandler, InternalConnManager, Cancellable {
 
@@ -66,11 +74,9 @@ class DefaultClientExchangeHandlerImpl<T
     private final NHttpClientConnectionManager connmgr;
     private final InternalClientExec exec;
     private final InternalState state;
-
-    private volatile boolean closed;
-    private volatile boolean completed;
-
-    private volatile NHttpClientConnection managedConn;
+    private final AtomicReference<NHttpClientConnection> managedConn;
+    private final AtomicBoolean closed;
+    private final AtomicBoolean completed;
 
     public DefaultClientExchangeHandlerImpl(
             final Log log,
@@ -89,13 +95,15 @@ class DefaultClientExchangeHandlerImpl<T
         this.connmgr = connmgr;
         this.exec = exec;
         this.state = new InternalState(requestProducer, responseConsumer, localContext);
+        this.closed = new AtomicBoolean(false);
+        this.completed = new AtomicBoolean(false);
+        this.managedConn = new AtomicReference<NHttpClientConnection>(null);
     }
 
     public void close() {
-        if (this.closed) {
+        if (this.closed.getAndSet(true)) {
             return;
         }
-        this.closed = true;
         abortConnection();
         try {
             this.requestProducer.close();
@@ -109,7 +117,7 @@ class DefaultClientExchangeHandlerImpl<T
         }
     }
 
-    public synchronized void start() throws HttpException, IOException {
+    public void start() throws HttpException, IOException {
         final HttpHost target = this.requestProducer.getTarget();
         final HttpRequest original = this.requestProducer.generateRequest();
 
@@ -121,14 +129,14 @@ class DefaultClientExchangeHandlerImpl<T
     }
 
     public boolean isDone() {
-        return this.completed;
+        return this.completed.get();
     }
 
-    public synchronized HttpRequest generateRequest() throws IOException, HttpException {
+    public HttpRequest generateRequest() throws IOException, HttpException {
         return this.exec.generateRequest(this.state, this);
     }
 
-    public synchronized void produceContent(
+    public void produceContent(
             final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
         this.exec.produceContent(this.state, encoder, ioctrl);
     }
@@ -137,19 +145,19 @@ class DefaultClientExchangeHandlerImpl<T
         this.exec.requestCompleted(this.state);
     }
 
-    public synchronized void responseReceived(
+    public void responseReceived(
             final HttpResponse response) throws IOException, HttpException {
         this.exec.responseReceived(this.state, response);
     }
 
-    public synchronized void consumeContent(
+    public void consumeContent(
             final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
         this.exec.consumeContent(this.state, decoder, ioctrl);
     }
 
-    public synchronized void responseCompleted() throws IOException, HttpException {
+    public void responseCompleted() throws IOException, HttpException {
         if (this.resultFuture.isDone()) {
-            this.completed = true;
+            this.completed.set(true);
             releaseConnection();
             return;
         }
@@ -164,13 +172,14 @@ class DefaultClientExchangeHandlerImpl<T
             } else {
                 this.resultFuture.failed(ex);
             }
-            this.completed = true;
+            this.completed.set(true);
         } else {
-            if (this.managedConn != null &&!this.managedConn.isOpen()) {
+            final NHttpClientConnection localConn = this.managedConn.get();
+            if (localConn != null &&!localConn.isOpen()) {
                 releaseConnection();
             }
-            if (this.managedConn != null) {
-                this.managedConn.requestOutput();
+            if (localConn != null) {
+                localConn.requestOutput();
             } else {
                 requestConnection();
             }
@@ -178,36 +187,33 @@ class DefaultClientExchangeHandlerImpl<T
     }
 
     public void inputTerminated() {
-        if (!this.completed) {
+        if (!this.completed.get()) {
             requestConnection();
         } else {
             close();
         }
     }
 
-    public synchronized void releaseConnection() {
-        if (this.managedConn != null) {
-            try {
-                if (this.log.isDebugEnabled()) {
-                    this.log.debug("[exchange: " + this.state.getId() + "] releasing connection");
-                }
-                this.managedConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
-                if (this.state.isReusable()) {
-                    this.connmgr.releaseConnection(this.managedConn,
+    public void releaseConnection() {
+        final NHttpClientConnection localConn = this.managedConn.getAndSet(null);
+        if (localConn != null) {
+            if (this.log.isDebugEnabled()) {
+                this.log.debug("[exchange: " + this.state.getId() + "] releasing connection");
+            }
+            localConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
+            if (this.state.isReusable()) {
+                this.connmgr.releaseConnection(localConn,
                         this.localContext.getUserToken(),
                         this.state.getValidDuration(), TimeUnit.MILLISECONDS);
-                    this.managedConn = null;
-                }
-            } finally {
-                abortConnection();
             }
         }
     }
 
-    public synchronized void abortConnection() {
-        if (this.managedConn != null) {
+    public void abortConnection() {
+        final NHttpClientConnection localConn = this.managedConn.getAndSet(null);
+        if (localConn != null) {
             try {
-                this.managedConn.shutdown();
+                localConn.shutdown();
                 if (this.log.isDebugEnabled()) {
                     this.log.debug("[exchange: " + this.state.getId() + "] connection discarded");
                 }
@@ -216,14 +222,12 @@ class DefaultClientExchangeHandlerImpl<T
                     this.log.debug(ex.getMessage(), ex);
                 }
             } finally {
-                this.connmgr.releaseConnection(
-                        this.managedConn, null, 0, TimeUnit.MILLISECONDS);
+                this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
             }
         }
-        this.managedConn = null;
     }
 
-    public synchronized void failed(final Exception ex) {
+    public void failed(final Exception ex) {
         try {
             this.requestProducer.failed(ex);
             this.responseConsumer.failed(ex);
@@ -236,7 +240,7 @@ class DefaultClientExchangeHandlerImpl<T
         }
     }
 
-    public synchronized boolean cancel() {
+    public boolean cancel() {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.state.getId() + "] Cancelled");
         }
@@ -261,16 +265,16 @@ class DefaultClientExchangeHandlerImpl<T
         }
     }
 
-    private synchronized void connectionAllocated(final NHttpClientConnection managedConn)
{
+    private void connectionAllocated(final NHttpClientConnection managedConn) {
         try {
             if (this.log.isDebugEnabled()) {
                 this.log.debug("[exchange: " + this.state.getId() + "] Connection allocated:
" + managedConn);
             }
-            this.managedConn = managedConn;
+            this.managedConn.set(managedConn);
             this.state.setValidDuration(0);
             this.state.setNonReusable();
 
-            if (this.closed) {
+            if (this.closed.get()) {
                 releaseConnection();
                 return;
             }
@@ -284,12 +288,12 @@ class DefaultClientExchangeHandlerImpl<T
             if (!managedConn.isOpen()) {
                 failed(new ConnectionClosedException("Connection closed"));
             } else {
-                this.managedConn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER,
this);
-                this.managedConn.requestOutput();
+                managedConn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER,
this);
                 final RequestConfig config = this.localContext.getRequestConfig();
                 if (config.getSocketTimeout() > 0) {
-                    this.managedConn.setSocketTimeout(config.getSocketTimeout());
+                    managedConn.setSocketTimeout(config.getSocketTimeout());
                 }
+                managedConn.requestOutput();
             }
         } catch (final RuntimeException runex) {
             failed(runex);
@@ -297,7 +301,7 @@ class DefaultClientExchangeHandlerImpl<T
         }
     }
 
-    private synchronized void connectionRequestFailed(final Exception ex) {
+    private void connectionRequestFailed(final Exception ex) {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.state.getId() + "] connection request failed");
         }
@@ -308,7 +312,7 @@ class DefaultClientExchangeHandlerImpl<T
         }
     }
 
-    private synchronized void connectionRequestCancelled() {
+    private void connectionRequestCancelled() {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.state.getId() + "] Connection request cancelled");
         }
@@ -351,7 +355,7 @@ class DefaultClientExchangeHandlerImpl<T
     }
 
     public NHttpClientConnection getConnection() {
-        return this.managedConn;
+        return this.managedConn.get();
     }
 
 }

Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java?rev=1531544&r1=1531543&r2=1531544&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java
(original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java
Sat Oct 12 15:23:55 2013
@@ -28,7 +28,9 @@ package org.apache.http.impl.nio.client;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.http.ConnectionClosedException;
@@ -59,6 +61,12 @@ import org.apache.http.nio.protocol.Http
 import org.apache.http.protocol.HttpCoreContext;
 import org.apache.http.protocol.HttpProcessor;
 
+/**
+ * Minimal implementation of {@link HttpAsyncClientExchangeHandler}.
+ * <p/>
+ * Instances of this class are expected to be accessed by one thread at a time only.
+ * The {@link #cancel()} method can be called concurrently by multiple threads.
+ */
 class MinimalClientExchangeHandlerImpl<T>
     implements HttpAsyncClientExchangeHandler, Cancellable {
 
@@ -75,16 +83,15 @@ class MinimalClientExchangeHandlerImpl<T
     private final HttpProcessor httpProcessor;
     private final ConnectionReuseStrategy connReuseStrategy;
     private final ConnectionKeepAliveStrategy keepaliveStrategy;
-
-    private volatile boolean closed;
-    private volatile boolean completed;
-
-    private HttpRoute route;
-    private NHttpClientConnection managedConn;
-    private HttpRequestWrapper request;
-    private HttpResponse response;
-    private boolean reusable;
-    private long validDuration;
+    private final AtomicReference<NHttpClientConnection> managedConn;
+    private final AtomicBoolean closed;
+    private final AtomicBoolean completed;
+
+    private volatile HttpRoute route;
+    private volatile HttpRequestWrapper request;
+    private volatile HttpResponse response;
+    private volatile boolean reusable;
+    private volatile long validDuration;
 
     public MinimalClientExchangeHandlerImpl(
             final Log log,
@@ -107,13 +114,15 @@ class MinimalClientExchangeHandlerImpl<T
         this.httpProcessor = httpProcessor;
         this.connReuseStrategy = connReuseStrategy;
         this.keepaliveStrategy = keepaliveStrategy;
+        this.closed = new AtomicBoolean(false);
+        this.completed = new AtomicBoolean(false);
+        this.managedConn = new AtomicReference<NHttpClientConnection>(null);
     }
 
-    public synchronized void close() {
-        if (this.closed) {
+    public void close() {
+        if (this.closed.getAndSet(true)) {
             return;
         }
-        this.closed = true;
         abortConnection();
         try {
             this.requestProducer.close();
@@ -127,7 +136,7 @@ class MinimalClientExchangeHandlerImpl<T
         }
     }
 
-    public synchronized void start() throws HttpException, IOException {
+    public void start() throws HttpException, IOException {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.id + "] Start execution");
         }
@@ -155,18 +164,19 @@ class MinimalClientExchangeHandlerImpl<T
     }
 
     public boolean isDone() {
-        return this.completed;
+        return this.completed.get();
     }
 
-    public synchronized HttpRequest generateRequest() throws IOException, HttpException {
-        if (!this.connmgr.isRouteComplete(this.managedConn)) {
-            this.connmgr.startRoute(this.managedConn, this.route, this.localContext);
-            this.connmgr.routeComplete(this.managedConn, this.route, this.localContext);
+    public HttpRequest generateRequest() throws IOException, HttpException {
+        final NHttpClientConnection localConn = this.managedConn.get();
+        if (!this.connmgr.isRouteComplete(localConn)) {
+            this.connmgr.startRoute(localConn, this.route, this.localContext);
+            this.connmgr.routeComplete(localConn, this.route, this.localContext);
         }
         return this.request;
     }
 
-    public synchronized void produceContent(
+    public void produceContent(
             final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.id + "] Produce content");
@@ -177,14 +187,14 @@ class MinimalClientExchangeHandlerImpl<T
         }
     }
 
-    public synchronized void requestCompleted() {
+    public void requestCompleted() {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.id + "] Request completed");
         }
         this.requestProducer.requestCompleted(this.localContext);
     }
 
-    public synchronized void responseReceived(
+    public void responseReceived(
             final HttpResponse response) throws IOException, HttpException {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.id + "] Response received " + response.getStatusLine());
@@ -197,7 +207,7 @@ class MinimalClientExchangeHandlerImpl<T
         this.response = response;
     }
 
-    public synchronized void consumeContent(
+    public void consumeContent(
             final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.id + "] Consume content");
@@ -205,9 +215,9 @@ class MinimalClientExchangeHandlerImpl<T
         this.responseConsumer.consumeContent(decoder, ioctrl);
     }
 
-    public synchronized void responseCompleted() throws IOException, HttpException {
+    public void responseCompleted() throws IOException, HttpException {
         if (this.resultFuture.isDone()) {
-            this.completed = true;
+            this.completed.set(true);
             releaseConnection();
             return;
         }
@@ -245,14 +255,14 @@ class MinimalClientExchangeHandlerImpl<T
         } else {
             this.resultFuture.failed(ex);
         }
-        this.completed = true;
+        this.completed.set(true);
     }
 
     public void inputTerminated() {
         close();
     }
 
-    public synchronized void failed(final Exception ex) {
+    public void failed(final Exception ex) {
         try {
             this.requestProducer.failed(ex);
             this.responseConsumer.failed(ex);
@@ -265,7 +275,7 @@ class MinimalClientExchangeHandlerImpl<T
         }
     }
 
-    public synchronized boolean cancel() {
+    public boolean cancel() {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.id + "] Cancelled");
         }
@@ -290,14 +300,14 @@ class MinimalClientExchangeHandlerImpl<T
         }
     }
 
-    private synchronized void connectionAllocated(final NHttpClientConnection managedConn)
{
+    private void connectionAllocated(final NHttpClientConnection managedConn) {
         try {
             if (this.log.isDebugEnabled()) {
                 this.log.debug("[exchange: " + this.id + "] Connection allocated: " + managedConn);
             }
-            this.managedConn = managedConn;
+            this.managedConn.set(managedConn);
 
-            if (this.closed) {
+            if (this.closed.get()) {
                 releaseConnection();
                 return;
             }
@@ -307,8 +317,12 @@ class MinimalClientExchangeHandlerImpl<T
             if (!managedConn.isOpen()) {
                 failed(new ConnectionClosedException("Connection closed"));
             } else {
-                this.managedConn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER,
this);
-                this.managedConn.requestOutput();
+                managedConn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER,
this);
+                final RequestConfig config = this.localContext.getRequestConfig();
+                if (config.getSocketTimeout() > 0) {
+                    managedConn.setSocketTimeout(config.getSocketTimeout());
+                }
+                managedConn.requestOutput();
             }
         } catch (final RuntimeException runex) {
             failed(runex);
@@ -316,7 +330,7 @@ class MinimalClientExchangeHandlerImpl<T
         }
     }
 
-    private synchronized void connectionRequestFailed(final Exception ex) {
+    private void connectionRequestFailed(final Exception ex) {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.id + "] Connection request failed");
         }
@@ -327,7 +341,7 @@ class MinimalClientExchangeHandlerImpl<T
         }
     }
 
-    private synchronized void connectionRequestCancelled() {
+    private void connectionRequestCancelled() {
         if (this.log.isDebugEnabled()) {
             this.log.debug("[exchange: " + this.id + "] Connection request cancelled");
         }
@@ -368,28 +382,25 @@ class MinimalClientExchangeHandlerImpl<T
     }
 
     private void releaseConnection() {
-        if (this.managedConn != null) {
-            try {
-                if (this.log.isDebugEnabled()) {
-                    this.log.debug("[exchange: " + this.id + "] Releasing connection");
-                }
-                this.managedConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
-                if (this.reusable) {
-                    this.connmgr.releaseConnection(this.managedConn,
+        final NHttpClientConnection localConn = this.managedConn.getAndSet(null);
+        if (localConn != null) {
+            if (this.log.isDebugEnabled()) {
+                this.log.debug("[exchange: " + this.id + "] Releasing connection");
+            }
+            localConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
+            if (this.reusable) {
+                this.connmgr.releaseConnection(localConn,
                         this.localContext.getUserToken(),
                         this.validDuration, TimeUnit.MILLISECONDS);
-                    this.managedConn = null;
-                }
-            } finally {
-                abortConnection();
             }
         }
     }
 
     private void abortConnection() {
-        if (this.managedConn != null) {
+        final NHttpClientConnection localConn = this.managedConn.getAndSet(null);
+        if (localConn != null) {
             try {
-                this.managedConn.shutdown();
+                localConn.shutdown();
                 if (this.log.isDebugEnabled()) {
                     this.log.debug("[exchange: " + this.id + "] Connection discarded");
                 }
@@ -398,11 +409,9 @@ class MinimalClientExchangeHandlerImpl<T
                     this.log.debug(ex.getMessage(), ex);
                 }
             } finally {
-                this.connmgr.releaseConnection(
-                        this.managedConn, null, 0, TimeUnit.MILLISECONDS);
+                this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
             }
         }
-        this.managedConn = null;
     }
 
 }



Mime
View raw message