hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1165005 - in /httpcomponents/httpcore/trunk: httpcore-nio/src/main/java/org/apache/http/nio/protocol/ httpcore/src/main/java/org/apache/http/pool/
Date Sun, 04 Sep 2011 12:23:02 GMT
Author: olegk
Date: Sun Sep  4 12:23:01 2011
New Revision: 1165005

URL: http://svn.apache.org/viewvc?rev=1165005&view=rev
Log:
HTTP async request executor to execute requests using a connection pool

Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestExecutor.java
    httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
    httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/ConnPool.java
    httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/PoolEntryFuture.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestExecutor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestExecutor.java?rev=1165005&r1=1165004&r2=1165005&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestExecutor.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestExecutor.java
Sun Sep  4 12:23:01 2011
@@ -26,13 +26,17 @@
  */
 package org.apache.http.nio.protocol;
 
+import java.io.IOException;
 import java.util.concurrent.Future;
 
 import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpHost;
 import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.params.HttpParams;
+import org.apache.http.pool.ConnPool;
+import org.apache.http.pool.PoolEntry;
 import org.apache.http.protocol.BasicHttpContext;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.protocol.HttpProcessor;
@@ -89,4 +93,157 @@ public class HttpAsyncRequestExecutor {
         return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
     }
 
+    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T>
execute(
+            final HttpAsyncRequestProducer requestProducer,
+            final HttpAsyncResponseConsumer<T> responseConsumer,
+            final ConnPool<HttpHost, E> connPool,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        if (connPool == null) {
+            throw new IllegalArgumentException("HTTP connection pool may not be null");
+        }
+        if (context == null) {
+            throw new IllegalArgumentException("HTTP context may not be null");
+        }
+        BasicFuture<T> future = new BasicFuture<T>(callback);
+        HttpHost target = requestProducer.getTarget();
+        connPool.lease(target, null, new ConnRequestCallback<T, E>(
+                future, requestProducer, responseConsumer, connPool, context));
+        return future;
+    }
+
+    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T>
execute(
+            final HttpAsyncRequestProducer requestProducer,
+            final HttpAsyncResponseConsumer<T> responseConsumer,
+            final ConnPool<HttpHost, E> connPool,
+            final HttpContext context) {
+        return execute(requestProducer, responseConsumer, connPool, context);
+    }
+
+    public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T>
execute(
+            final HttpAsyncRequestProducer requestProducer,
+            final HttpAsyncResponseConsumer<T> responseConsumer,
+            final ConnPool<HttpHost, E> connPool) {
+        return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
+    }
+
+    class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
implements FutureCallback<E> {
+
+        private final BasicFuture<T> future;
+        private final HttpAsyncRequestProducer requestProducer;
+        private final HttpAsyncResponseConsumer<T> responseConsumer;
+        private final ConnPool<HttpHost, E> connPool;
+        private final HttpContext context;
+
+        ConnRequestCallback(
+                final BasicFuture<T> future,
+                final HttpAsyncRequestProducer requestProducer,
+                final HttpAsyncResponseConsumer<T> responseConsumer,
+                final ConnPool<HttpHost, E> connPool,
+                final HttpContext context) {
+            super();
+            this.future = future;
+            this.requestProducer = requestProducer;
+            this.responseConsumer = responseConsumer;
+            this.connPool = connPool;
+            this.context = context;
+        }
+
+        public void completed(final E result) {
+            if (this.future.isDone()) {
+                this.connPool.release(result, true);
+                return;
+            }
+            NHttpClientConnection conn = result.getConnection();
+            HttpAsyncClientExchangeHandler<T> handler = new HttpAsyncClientExchangeHandlerImpl<T>(
+                    this.future, this.requestProducer, this.responseConsumer, this.context,
+                    httppocessor, conn, reuseStrategy, params);
+            conn.getContext().setAttribute(HttpAsyncClientProtocolHandler.HTTP_HANDLER, handler);
+            conn.requestOutput();
+        }
+
+        public void failed(final Exception ex) {
+            try {
+                try {
+                    this.responseConsumer.failed(ex);
+                } finally {
+                    releaseResources();
+                }
+            } finally {
+                this.future.failed(ex);
+            }
+        }
+
+        public void cancelled() {
+            try {
+                try {
+                    this.responseConsumer.cancel();
+                } finally {
+                    releaseResources();
+                }
+            } finally {
+                this.future.cancel(true);
+            }
+        }
+
+        public void releaseResources() {
+            try {
+                this.requestProducer.close();
+            } catch (IOException ioex) {
+                onException(ioex);
+            }
+            try {
+                this.responseConsumer.close();
+            } catch (IOException ioex) {
+                onException(ioex);
+            }
+        }
+
+    }
+
+    class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
implements FutureCallback<T> {
+
+        private final BasicFuture<T> future;
+        private final E poolEntry;
+        private final ConnPool<HttpHost, E> connPool;
+
+        RequestExecutionCallback(
+                final BasicFuture<T> future,
+                final E poolEntry,
+                final ConnPool<HttpHost, E> connPool) {
+            super();
+            this.future = future;
+            this.poolEntry = poolEntry;
+            this.connPool = connPool;
+        }
+
+        public void completed(final T result) {
+            try {
+                this.connPool.release(this.poolEntry, true);
+            } finally {
+                this.future.completed(result);
+            }
+        }
+
+        public void failed(final Exception ex) {
+            try {
+                this.connPool.release(this.poolEntry, false);
+            } finally {
+                this.future.failed(ex);
+            }
+        }
+
+        public void cancelled() {
+            try {
+                this.connPool.release(this.poolEntry, false);
+            } finally {
+                this.future.cancel(true);
+            }
+        }
+
+    }
+
+    protected void onException(Exception ex) {
+    }
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java?rev=1165005&r1=1165004&r2=1165005&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
Sun Sep  4 12:23:01 2011
@@ -41,6 +41,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.http.annotation.ThreadSafe;
+import org.apache.http.concurrent.FutureCallback;
 
 /**
  * Abstract blocking connection pool.
@@ -145,14 +146,14 @@ public abstract class AbstractConnPool<T
         return pool;
     }
 
-    public Future<E> lease(final T route, final Object state) {
+    public Future<E> lease(final T route, final Object state, final FutureCallback<E>
callback) {
         if (route == null) {
             throw new IllegalArgumentException("Route may not be null");
         }
         if (this.isShutDown) {
             throw new IllegalStateException("Connection pool shut down");
         }
-        return new PoolEntryFuture<E>(this.lock) {
+        return new PoolEntryFuture<E>(this.lock, callback) {
 
             @Override
             public E getPoolEntry(
@@ -165,6 +166,10 @@ public abstract class AbstractConnPool<T
         };
     }
 
+    public Future<E> lease(final T route, final Object state) {
+        return lease(route, state, null);
+    }
+
     private E getPoolEntryBlocking(
             final T route, final Object state,
             final long timeout, final TimeUnit tunit,

Modified: httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/ConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/ConnPool.java?rev=1165005&r1=1165004&r2=1165005&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/ConnPool.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/ConnPool.java
Sun Sep  4 12:23:01 2011
@@ -28,6 +28,8 @@ package org.apache.http.pool;
 
 import java.util.concurrent.Future;
 
+import org.apache.http.concurrent.FutureCallback;
+
 /**
  * Abstract connection pool.
  *
@@ -38,7 +40,7 @@ import java.util.concurrent.Future;
  */
 public interface ConnPool<T, E> {
 
-    Future<E> lease(final T route, final Object state);
+    Future<E> lease(final T route, final Object state, final FutureCallback<E>
callback);
 
     void release(E entry, boolean reusable);
 

Modified: httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/PoolEntryFuture.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/PoolEntryFuture.java?rev=1165005&r1=1165004&r2=1165005&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/PoolEntryFuture.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore/src/main/java/org/apache/http/pool/PoolEntryFuture.java
Sun Sep  4 12:23:01 2011
@@ -36,20 +36,23 @@ import java.util.concurrent.locks.Condit
 import java.util.concurrent.locks.Lock;
 
 import org.apache.http.annotation.ThreadSafe;
+import org.apache.http.concurrent.FutureCallback;
 
 @ThreadSafe
 abstract class PoolEntryFuture<T> implements Future<T> {
 
     private final Lock lock;
+    private final FutureCallback<T> callback;
     private final Condition condition;
     private volatile boolean cancelled;
     private volatile boolean completed;
     private T result;
 
-    PoolEntryFuture(final Lock lock) {
+    PoolEntryFuture(final Lock lock, final FutureCallback<T> callback) {
         super();
         this.lock = lock;
         this.condition = lock.newCondition();
+        this.callback = callback;
     }
 
     public boolean cancel(boolean mayInterruptIfRunning) {
@@ -60,6 +63,9 @@ abstract class PoolEntryFuture<T> implem
             }
             this.completed = true;
             this.cancelled = true;
+            if (this.callback != null) {
+                this.callback.cancelled();
+            }
             this.condition.signalAll();
             return true;
         } finally {
@@ -93,10 +99,16 @@ abstract class PoolEntryFuture<T> implem
             }
             this.result = getPoolEntry(timeout, unit);
             this.completed = true;
+            if (this.callback != null) {
+                this.callback.completed(this.result);
+            }
             return result;
         } catch (IOException ex) {
             this.completed = true;
             this.result = null;
+            if (this.callback != null) {
+                this.callback.failed(ex);
+            }
             throw new ExecutionException(ex);
         } finally {
             this.lock.unlock();



Mime
View raw message