hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1772441 [3/4] - in /httpcomponents/httpcore/trunk: httpcore5-ab/src/main/java/org/apache/hc/core5/http/benchmark/ httpcore5-ab/src/test/java/org/apache/hc/core5/http/benchmark/ httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/...
Date Sat, 03 Dec 2016 09:47:15 GMT
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java Sat Dec  3 09:47:13 2016
@@ -28,38 +28,114 @@
 package org.apache.hc.core5.http.impl.nio.bootstrap;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.command.ExecutionCommand;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownType;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
 import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Asserts;
 
 /**
- * Client side endpoint that can initiate HTTP message exchanges.
+ * Client endpoint that can be used to initiate HTTP message exchanges.
  *
  * @since 5.0
  */
-@Contract(threading = ThreadingBehavior.SAFE)
-public interface ClientEndpoint extends Closeable {
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
+public final class ClientEndpoint implements Closeable {
 
-    void execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
+    private final IOSession ioSession;
+    private final AtomicBoolean closed;
 
-    <T> Future<T> execute(
-            AsyncRequestProducer requestProducer,
-            AsyncResponseConsumer<T> responseConsumer,
-            HttpContext context,
-            FutureCallback<T> callback);
-
-    <T> Future<T> execute(
-            AsyncRequestProducer requestProducer,
-            AsyncResponseConsumer<T> responseConsumer,
-            FutureCallback<T> callback);
-
-    void shutdown(ShutdownType shutdownType);
+    public ClientEndpoint(final IOSession ioSession) {
+        super();
+        this.ioSession = ioSession;
+        this.closed = new AtomicBoolean(false);
+    }
+
+    public void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context) {
+        Asserts.check(!closed.get(), "Connection is already closed");
+        final Command executionCommand = new ExecutionCommand(
+                exchangeHandler,
+                context != null ? context : HttpCoreContext.create());
+        ioSession.addLast(executionCommand);
+        if (ioSession.isClosed()) {
+            executionCommand.cancel();
+        }
+    }
+
+    public <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        Asserts.check(!closed.get(), "Connection is already closed");
+        final BasicFuture<T> future = new BasicFuture<>(callback);
+        execute(new BasicClientExchangeHandler<>(requestProducer, responseConsumer,
+                new FutureCallback<T>() {
+
+                    @Override
+                    public void completed(final T result) {
+                        future.completed(result);
+                    }
+
+                    @Override
+                    public void failed(final Exception ex) {
+                        future.failed(ex);
+                    }
+
+                    @Override
+                    public void cancelled() {
+                        future.cancel();
+                    }
+
+                }),
+                context != null ? context : HttpCoreContext.create());
+        return future;
+    }
+
+    public <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, null, callback);
+    }
+
+    public boolean isOpen() {
+        return !closed.get() && !ioSession.isClosed();
+    }
+
+    public void shutdown() {
+        if (closed.compareAndSet(false, true)) {
+            ioSession.shutdown();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return ioSession.toString();
+    }
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java Sat Dec  3 09:47:13 2016
@@ -33,15 +33,20 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.concurrent.FutureWrapper;
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.ExceptionListener;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.impl.PoolEntryHolder;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownType;
-import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.pool.ControlledConnPool;
+import org.apache.hc.core5.pool.PoolEntry;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOSession;
-import org.apache.hc.core5.reactor.IOSessionCallback;
 import org.apache.hc.core5.reactor.SessionRequest;
 import org.apache.hc.core5.reactor.SessionRequestCallback;
 import org.apache.hc.core5.util.Args;
@@ -52,65 +57,125 @@ import org.apache.hc.core5.util.Args;
 public class HttpAsyncRequester extends AsyncRequester {
 
     private final IOEventHandlerFactory handlerFactory;
+    private final ControlledConnPool<HttpHost, ClientEndpoint> connPool;
 
-    HttpAsyncRequester(
-            final IOEventHandlerFactory handlerFactory,
+    public HttpAsyncRequester(
             final IOReactorConfig ioReactorConfig,
-            final ExceptionListener exceptionListener) {
-        super(ioReactorConfig, exceptionListener, new IOSessionCallback() {
+            final ExceptionListener exceptionListener,
+            final IOEventHandlerFactory handlerFactory,
+            final ControlledConnPool<HttpHost, ClientEndpoint> connPool) {
+        super(ioReactorConfig, exceptionListener, new Callback<IOSession>() {
 
             @Override
-            public void execute(final IOSession session) throws IOException {
+            public void execute(final IOSession session) {
                 session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
             }
 
         });
         this.handlerFactory = Args.notNull(handlerFactory, "Handler factory");
+        this.connPool = Args.notNull(connPool, "Connection pool");
     }
 
     public void start() throws IOException {
         execute(handlerFactory);
     }
 
-    public Future<ClientEndpoint> connect(
-            final NamedEndpoint remoteEndpoint,
+    public Future<PooledClientEndpoint> connect(
+            final HttpHost host,
             final long timeout,
             final TimeUnit timeUnit,
-            final FutureCallback<ClientEndpoint> callback) throws InterruptedException {
-        Args.notNull(remoteEndpoint, "Remote endpoint");
+            final FutureCallback<PooledClientEndpoint> callback) {
+        Args.notNull(host, "Host");
         Args.notNull(timeUnit, "Time unit");
-        final BasicFuture<ClientEndpoint> future = new BasicFuture<>(callback);
-        requestSession(remoteEndpoint, timeout, timeUnit, new SessionRequestCallback() {
+        final BasicFuture<PooledClientEndpoint> resultFuture = new BasicFuture<>(callback);
+        final Future<PoolEntry<HttpHost, ClientEndpoint>> leaseFuture = connPool.lease(
+                host, null, new FutureCallback<PoolEntry<HttpHost, ClientEndpoint>>() {
 
             @Override
-            public void completed(final SessionRequest request) {
-                final IOSession session = request.getSession();
-                future.completed(new ClientEndpointImpl(session));
+            public void completed(final PoolEntry<HttpHost, ClientEndpoint> poolEntry) {
+                final PoolEntryHolder<HttpHost, ClientEndpoint> poolEntryHolder = new PoolEntryHolder<>(
+                        connPool,
+                        poolEntry,
+                        new Callback<ClientEndpoint>() {
+
+                            @Override
+                            public void execute(final ClientEndpoint clientEndpoint) {
+                                clientEndpoint.shutdown();
+                            }
+
+                        });
+                final ClientEndpoint clientEndpoint = poolEntry.getConnection();
+                if (clientEndpoint != null && !clientEndpoint.isOpen()) {
+                    poolEntry.discardConnection();
+                }
+                if (poolEntry.hasConnection()) {
+                    resultFuture.completed(new PooledClientEndpoint(poolEntryHolder));
+                } else {
+                    requestSession(host, timeout, timeUnit, new SessionRequestCallback() {
+
+                        @Override
+                        public void completed(final SessionRequest request) {
+                            poolEntry.assignConnection(new ClientEndpoint(request.getSession()));
+                            resultFuture.completed(new PooledClientEndpoint(poolEntryHolder));
+                        }
+
+                        @Override
+                        public void failed(final SessionRequest request) {
+                            try {
+                                resultFuture.failed(request.getException());
+                            } finally {
+                                poolEntryHolder.abortConnection();
+                            }
+                        }
+
+                        @Override
+                        public void timeout(final SessionRequest request) {
+                            try {
+                                resultFuture.failed(new SocketTimeoutException("Connect timeout"));
+                            } finally {
+                                poolEntryHolder.abortConnection();
+                            }
+                        }
+
+                        @Override
+                        public void cancelled(final SessionRequest request) {
+                            try {
+                                resultFuture.cancel();
+                            } finally {
+                                poolEntryHolder.abortConnection();
+                            }
+                        }
+
+                    });
+                }
             }
 
             @Override
-            public void failed(final SessionRequest request) {
-                future.failed(request.getException());
+            public void failed(final Exception ex) {
+                resultFuture.failed(ex);
             }
 
             @Override
-            public void timeout(final SessionRequest request) {
-                future.failed(new SocketTimeoutException("Connect timeout"));
+            public void cancelled() {
+                resultFuture.cancel();
             }
 
+        });
+        return new FutureWrapper<>(resultFuture, new Cancellable() {
+
             @Override
-            public void cancelled(final SessionRequest request) {
-                future.cancel();
+            public boolean cancel() {
+                return leaseFuture.cancel(true);
             }
+
         });
-        return future;
     }
 
-    public Future<ClientEndpoint> connect(
-            final NamedEndpoint remoteEndpoint,
+    public Future<PooledClientEndpoint> connect(
+            final HttpHost host,
             final long timeout,
             final TimeUnit timeUnit) throws InterruptedException {
-        return connect(remoteEndpoint, timeout, timeUnit, null);
+        return connect(host, timeout, timeUnit, null);
     }
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java Sat Dec  3 09:47:13 2016
@@ -28,13 +28,13 @@ package org.apache.hc.core5.http.impl.ni
 
 import java.io.IOException;
 
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownType;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOSession;
-import org.apache.hc.core5.reactor.IOSessionCallback;
 import org.apache.hc.core5.util.Args;
 
 /**
@@ -44,14 +44,14 @@ public class HttpAsyncServer extends Asy
 
     private final IOEventHandlerFactory handlerFactory;
 
-    HttpAsyncServer(
+    public HttpAsyncServer(
             final IOEventHandlerFactory handlerFactory,
             final IOReactorConfig ioReactorConfig,
             final ExceptionListener exceptionListener) {
-        super(ioReactorConfig, exceptionListener, new IOSessionCallback() {
+        super(ioReactorConfig, exceptionListener, new Callback<IOSession>() {
 
             @Override
-            public void execute(final IOSession session) throws IOException {
+            public void execute(final IOSession session) {
                 session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
             }
 

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/IOReactorExecutor.java Sat Dec  3 09:47:13 2016
@@ -35,23 +35,24 @@ import java.util.concurrent.ThreadFactor
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.reactor.AbstractMultiworkerIOReactor;
 import org.apache.hc.core5.reactor.ExceptionEvent;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOReactorStatus;
-import org.apache.hc.core5.reactor.IOSessionCallback;
+import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
 
-abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> {
+abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> implements AutoCloseable {
 
     enum Status { READY, RUNNING, TERMINATED }
 
     private final IOReactorConfig ioReactorConfig;
     private final ExceptionListener exceptionListener;
-    private final IOSessionCallback sessionShutdownCallback;
+    private final Callback<IOSession> sessionShutdownCallback;
     private final ExecutorService executorService;
     private final ThreadFactory workerThreadFactory;
     private final AtomicReference<T> ioReactorRef;
@@ -62,7 +63,7 @@ abstract class IOReactorExecutor<T exten
             final ExceptionListener exceptionListener,
             final ThreadFactory threadFactory,
             final ThreadFactory workerThreadFactory,
-            final IOSessionCallback sessionShutdownCallback) {
+            final Callback<IOSession> sessionShutdownCallback) {
         super();
         this.ioReactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
         this.exceptionListener = exceptionListener;
@@ -135,10 +136,7 @@ abstract class IOReactorExecutor<T exten
         if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
             ioReactor.initiateShutdown();
             if (sessionShutdownCallback != null) {
-                try {
-                    ioReactor.enumSessions(sessionShutdownCallback);
-                } catch (IOException ignore) {
-                }
+                ioReactor.enumSessions(sessionShutdownCallback);
             }
         }
     }
@@ -158,4 +156,9 @@ abstract class IOReactorExecutor<T exten
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        shutdown(5, TimeUnit.SECONDS);
+    }
+
 }

Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/PooledClientEndpoint.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/PooledClientEndpoint.java?rev=1772441&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/PooledClientEndpoint.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/PooledClientEndpoint.java Sat Dec  3 09:47:13 2016
@@ -0,0 +1,181 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio.bootstrap;
+
+import java.util.concurrent.Future;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.impl.PoolEntryHolder;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.ResourceHolder;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * Client endpoint leased from a pool of connections.
+ * <p>
+ * Once the endpoint is no longer needed it MUST be released with {@link #releaseResources()}.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
+public final class PooledClientEndpoint implements ResourceHolder {
+
+    private final PoolEntryHolder<HttpHost, ClientEndpoint> poolEntryHolder;
+
+    PooledClientEndpoint(final PoolEntryHolder<HttpHost, ClientEndpoint> poolEntryHolder) {
+        super();
+        this.poolEntryHolder = poolEntryHolder;
+    }
+
+    private ClientEndpoint getClientEndpoint() {
+        final ClientEndpoint endpoint = poolEntryHolder.getConnection();
+        Asserts.check(endpoint != null, "Client endpoint already released");
+        return endpoint;
+    }
+
+    /**
+     * Initiates a message exchange using the given handler.
+     * <p>
+     * Once the endpoint is no longer needed it MUST be released with {@link #releaseResources()}.
+     */
+    public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
+        getClientEndpoint().execute(exchangeHandler, context);
+    }
+
+    /**
+     * Initiates message exchange using the given request producer and response consumer.
+     * <p>
+     * Once the endpoint is no longer needed it MUST be released with {@link #releaseResources()}.
+     */
+    public <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        return getClientEndpoint().execute(requestProducer, responseConsumer, context, callback);
+    }
+
+    /**
+     * Initiates a message exchange using the given request producer and response consumer.
+     * <p>
+     * Once the endpoint is no longer needed it MUST be released with {@link #releaseResources()}.
+     */
+    public <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, null, callback);
+    }
+
+    /**
+     * Initiates a message exchange using the given request producer and response consumer and
+     * automatically invokes {@link #releaseResources()} upon its completion.
+     */
+    public <T> Future<T> executeAndRelease(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        return getClientEndpoint().execute(requestProducer, responseConsumer, context, new FutureCallback<T>() {
+
+            @Override
+            public void completed(final T result) {
+                try {
+                    if (callback != null) {
+                        callback.completed(result);
+                    }
+                } finally {
+                    releaseResources();
+                }
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                try {
+                    if (callback != null) {
+                        callback.failed(ex);
+                    }
+                } finally {
+                    releaseResources();
+                }
+            }
+
+            @Override
+            public void cancelled() {
+                try {
+                    if (callback != null) {
+                        callback.cancelled();
+                    }
+                } finally {
+                    releaseResources();
+                }
+            }
+
+        });
+    }
+
+    /**
+     * Initiates a message exchange using the given request producer and response consumer and
+     * automatically invokes {@link #releaseResources()} upon its completion.
+     */
+    public <T> Future<T> executeAndRelease(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final FutureCallback<T> callback) {
+        return executeAndRelease(requestProducer, responseConsumer, null, callback);
+    }
+
+    /**
+     * Releases the underlying connection back to the connection pool.
+     */
+    @Override
+    public void releaseResources() {
+        poolEntryHolder.markReusable();
+        poolEntryHolder.releaseConnection();
+    }
+
+    /**
+     * Shuts down the underlying connection and removes it from the connection pool.
+     */
+    public void shutdown() {
+        poolEntryHolder.abortConnection();
+    }
+
+    @Override
+    public String toString() {
+        final ClientEndpoint endpoint = poolEntryHolder.getConnection();
+        return endpoint != null ? endpoint.toString() : "released";
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/PooledClientEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/PooledClientEndpoint.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/PooledClientEndpoint.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/RequesterBootstrap.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/RequesterBootstrap.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/RequesterBootstrap.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/RequesterBootstrap.java Sat Dec  3 09:47:13 2016
@@ -26,18 +26,23 @@
  */
 package org.apache.hc.core5.http.impl.nio.bootstrap;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hc.core5.http.ConnectionReuseStrategy;
 import org.apache.hc.core5.http.ExceptionListener;
-import org.apache.hc.core5.http.impl.HttpProcessors;
+import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.impl.ConnectionListener;
 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.HttpProcessors;
 import org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandlerFactory;
-import org.apache.hc.core5.http.impl.nio.ConnectionListener;
 import org.apache.hc.core5.http.impl.nio.DefaultHttpRequestWriterFactory;
 import org.apache.hc.core5.http.impl.nio.DefaultHttpResponseParserFactory;
-import org.apache.hc.core5.http.impl.nio.Http1StreamListener;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.pool.ConnPoolListener;
+import org.apache.hc.core5.pool.StrictConnPool;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 
 /**
@@ -49,9 +54,14 @@ public class RequesterBootstrap {
     private ConnectionConfig connectionConfig;
     private HttpProcessor httpProcessor;
     private ConnectionReuseStrategy connStrategy;
+    private int defaultMaxPerRoute;
+    private int maxTotal;
+    private long timeToLive;
+    private TimeUnit timeUnit;
     private ExceptionListener exceptionListener;
     private ConnectionListener connectionListener;
     private Http1StreamListener streamListener;
+    private ConnPoolListener<HttpHost> connPoolListener;
 
     private RequesterBootstrap() {
     }
@@ -92,6 +102,22 @@ public class RequesterBootstrap {
         return this;
     }
 
+    public final RequesterBootstrap setDefaultMaxPerRoute(final int defaultMaxPerRoute) {
+        this.defaultMaxPerRoute = defaultMaxPerRoute;
+        return this;
+    }
+
+    public final RequesterBootstrap setMaxTotal(final int maxTotal) {
+        this.maxTotal = maxTotal;
+        return this;
+    }
+
+    public final RequesterBootstrap setTimeToLive(final long timeToLive, final TimeUnit timeUnit) {
+        this.timeToLive = timeToLive;
+        this.timeUnit = timeUnit;
+        return this;
+    }
+
     /**
      * Assigns {@link ExceptionListener} instance.
      */
@@ -110,15 +136,23 @@ public class RequesterBootstrap {
 
     /**
      * Assigns {@link Http1StreamListener} instance.
-     *
-     * @since 5.0
      */
     public final RequesterBootstrap setStreamListener(final Http1StreamListener streamListener) {
         this.streamListener = streamListener;
         return this;
     }
 
+    public final RequesterBootstrap setConnPoolListener(final ConnPoolListener<HttpHost> connPoolListener) {
+        this.connPoolListener = connPoolListener;
+        return this;
+    }
+
     public HttpAsyncRequester create() {
+        final StrictConnPool<HttpHost, ClientEndpoint> connPool = new StrictConnPool<>(
+                defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 20,
+                maxTotal > 0 ? maxTotal : 50,
+                timeToLive, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS,
+                connPoolListener);
         final ClientHttp1IOEventHandlerFactory ioEventHandlerFactory = new ClientHttp1IOEventHandlerFactory(
                 httpProcessor != null ? httpProcessor : HttpProcessors.client(),
                 connectionConfig,
@@ -130,9 +164,10 @@ public class RequesterBootstrap {
                 connectionListener,
                 streamListener);
         return new HttpAsyncRequester(
-                ioEventHandlerFactory,
                 ioReactorConfig,
-                exceptionListener);
+                exceptionListener,
+                ioEventHandlerFactory,
+                connPool);
     }
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ServerBootstrap.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ServerBootstrap.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ServerBootstrap.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ServerBootstrap.java Sat Dec  3 09:47:13 2016
@@ -35,13 +35,13 @@ import org.apache.hc.core5.http.impl.Htt
 import org.apache.hc.core5.http.config.ConnectionConfig;
 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
-import org.apache.hc.core5.http.impl.nio.ConnectionListener;
+import org.apache.hc.core5.http.impl.ConnectionListener;
 import org.apache.hc.core5.http.impl.nio.DefaultHttpRequestParserFactory;
 import org.apache.hc.core5.http.impl.nio.DefaultHttpResponseWriterFactory;
-import org.apache.hc.core5.http.impl.nio.Http1StreamListener;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
 import org.apache.hc.core5.http.impl.nio.ServerHttp1IOEventHandlerFactory;
 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
-import org.apache.hc.core5.http.Supplier;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.nio.support.BasicServerExchangeHandler;
 import org.apache.hc.core5.http.nio.support.RequestConsumerSupplier;
 import org.apache.hc.core5.http.nio.support.ResponseHandler;

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/AbstractHttpEntity.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/AbstractHttpEntity.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/AbstractHttpEntity.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/AbstractHttpEntity.java Sat Dec  3 09:47:13 2016
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.Supplier;
+import org.apache.hc.core5.function.Supplier;
 
 /**
  * Abstract base class for mutable entities. Provides the commonly used attributes for streamed and

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/HttpEntityWithTrailers.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/HttpEntityWithTrailers.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/HttpEntityWithTrailers.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/HttpEntityWithTrailers.java Sat Dec  3 09:47:13 2016
@@ -37,7 +37,7 @@ import java.util.Set;
 
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpEntity;
-import org.apache.hc.core5.http.Supplier;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.util.Args;
 
 /**

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/HttpEntityWrapper.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/HttpEntityWrapper.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/HttpEntityWrapper.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/io/entity/HttpEntityWrapper.java Sat Dec  3 09:47:13 2016
@@ -35,7 +35,7 @@ import java.util.Set;
 
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpEntity;
-import org.apache.hc.core5.http.Supplier;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.util.Args;
 
 /**

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPool.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPool.java Sat Dec  3 09:47:13 2016
@@ -26,6 +26,7 @@
  */
 package org.apache.hc.core5.pool;
 
+import java.io.Closeable;
 import java.util.concurrent.Future;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
@@ -36,10 +37,10 @@ import org.apache.hc.core5.concurrent.Fu
  *
  * @param <T> the route type that represents the opposite endpoint of a pooled
  *   connection.
- * @param <E> the type of the pool entry containing a pooled connection.
+ * @param <C> the type of pooled connections.
  * @since 4.2
  */
-public interface ConnPool<T, E> {
+public interface ConnPool<T, C extends Closeable> {
 
     /**
      * Attempts to lease a connection for the given route and with the given
@@ -54,7 +55,7 @@ public interface ConnPool<T, E> {
      *
      * @return future for a leased pool entry.
      */
-    Future<E> lease(final T route, final Object state, final FutureCallback<E> callback);
+    Future<PoolEntry<T, C>> lease(final T route, final Object state, final FutureCallback<PoolEntry<T, C>> callback);
 
     /**
      * Releases the pool entry back to the pool.
@@ -63,6 +64,6 @@ public interface ConnPool<T, E> {
      * @param reusable flag indicating whether or not the released connection
      *   is in a consistent state and is safe for further use.
      */
-    void release(E entry, boolean reusable);
+    void release(PoolEntry<T, C> entry, boolean reusable);
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolControl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolControl.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolControl.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolControl.java Sat Dec  3 09:47:13 2016
@@ -26,6 +26,8 @@
  */
 package org.apache.hc.core5.pool;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Interface to control runtime properties of a {@link ConnPool} such as
  * maximum total number of connections or maximum connections per route
@@ -35,7 +37,7 @@ package org.apache.hc.core5.pool;
  *   connection.
  * @since 4.2
  */
-public interface ConnPoolControl<T> {
+public interface ConnPoolControl<T> extends ConnPoolStats<T> {
 
     void setMaxTotal(int max);
 
@@ -49,8 +51,8 @@ public interface ConnPoolControl<T> {
 
     int getMaxPerRoute(final T route);
 
-    PoolStats getTotalStats();
+    void closeIdle(long idletime, TimeUnit tunit);
 
-    PoolStats getStats(final T route);
+    void closeExpired();
 
 }

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolListener.java (from r1772397, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionCallback.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolListener.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolListener.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionCallback.java&r1=1772397&r2=1772441&rev=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionCallback.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolListener.java Sat Dec  3 09:47:13 2016
@@ -24,18 +24,21 @@
  * <http://www.apache.org/>.
  *
  */
+package org.apache.hc.core5.pool;
 
-package org.apache.hc.core5.reactor;
-
-import java.io.IOException;
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
 
 /**
- * Callback for operations with {@link IOSession} instances managed by an I/O reactor..
+ * Connection pool event listener.
  *
  * @since 5.0
  */
-public interface IOSessionCallback {
+@Contract(threading = ThreadingBehavior.STATELESS)
+public interface ConnPoolListener<T> {
+
+    void onLease(T route, ConnPoolStats<T> connPoolStats);
 
-    void execute(IOSession session) throws IOException;
+    void onRelease(T route, ConnPoolStats<T> connPoolStats);
 
 }

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolListener.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolStats.java (from r1772397, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntryCallback.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolStats.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolStats.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntryCallback.java&r1=1772397&r2=1772441&rev=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntryCallback.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolStats.java Sat Dec  3 09:47:13 2016
@@ -27,15 +27,16 @@
 package org.apache.hc.core5.pool;
 
 /**
- * Pool entry callabck.
+ * Interface to obtain connection pool statistics.
  *
  * @param <T> the route type that represents the opposite endpoint of a pooled
  *   connection.
- * @param <C> the connection type.
- * @since 4.3
+ * @since 4.2
  */
-public interface PoolEntryCallback<T, C> {
+public interface ConnPoolStats<T> {
 
-    void process(PoolEntry<T, C> entry);
+    PoolStats getTotalStats();
+
+    PoolStats getStats(final T route);
 
 }

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolStats.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolStats.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ConnPoolStats.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ControlledConnPool.java (from r1772397, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/io/ConnFactory.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ControlledConnPool.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ControlledConnPool.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/io/ConnFactory.java&r1=1772397&r2=1772441&rev=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/io/ConnFactory.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ControlledConnPool.java Sat Dec  3 09:47:13 2016
@@ -24,21 +24,20 @@
  * <http://www.apache.org/>.
  *
  */
+package org.apache.hc.core5.pool;
 
-package org.apache.hc.core5.pool.io;
-
-import java.io.IOException;
+import java.io.Closeable;
 
 /**
- * Factory for poolable blocking connections.
+ * {@link ConnPool} that also implements {@link ConnPoolControl} and {@link AutoCloseable}.
  *
  * @param <T> the route type that represents the opposite endpoint of a pooled
  *   connection.
- * @param <C> the connection type.
+ * @param <C> the type of pooled connections.
  * @since 4.2
  */
-public interface ConnFactory<T, C> {
+public interface ControlledConnPool<T, C extends Closeable> extends ConnPool<T, C>, ConnPoolControl<T>, AutoCloseable {
 
-    C create(T route) throws IOException;
+    void shutdown();
 
 }

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ControlledConnPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ControlledConnPool.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/ControlledConnPool.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/LeaseRequest.java (from r1772397, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/nio/LeaseRequest.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/LeaseRequest.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/LeaseRequest.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/nio/LeaseRequest.java&r1=1772397&r2=1772441&rev=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/nio/LeaseRequest.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/LeaseRequest.java Sat Dec  3 09:47:13 2016
@@ -24,45 +24,42 @@
  * <http://www.apache.org/>.
  *
  */
-package org.apache.hc.core5.pool.nio;
+package org.apache.hc.core5.pool;
 
+import java.io.Closeable;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.concurrent.BasicFuture;
-import org.apache.hc.core5.pool.PoolEntry;
 
 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
-class LeaseRequest<T, C, E extends PoolEntry<T, C>> {
+class LeaseRequest<T, C extends Closeable> {
 
     private final T route;
     private final Object state;
-    private final long connectTimeout;
     private final long deadline;
-    private final BasicFuture<E> future;
+    private final BasicFuture<PoolEntry<T, C>> future;
     private final AtomicBoolean completed;
-    private volatile E result;
+    private volatile PoolEntry<T, C> result;
     private volatile Exception ex;
 
     /**
-     * Contructor
+     * Constructor
+     *
      * @param route route
      * @param state state
-     * @param connectTimeout http connection timeout
      * @param leaseTimeout timeout to wait in a request queue until kicked off
      * @param future future callback
      */
     public LeaseRequest(
             final T route,
             final Object state,
-            final long connectTimeout,
             final long leaseTimeout,
-            final BasicFuture<E> future) {
+            final BasicFuture<PoolEntry<T, C>> future) {
         super();
         this.route = route;
         this.state = state;
-        this.connectTimeout = connectTimeout;
         this.deadline = leaseTimeout > 0 ? System.currentTimeMillis() + leaseTimeout :
                 Long.MAX_VALUE;
         this.future = future;
@@ -77,10 +74,6 @@ class LeaseRequest<T, C, E extends PoolE
         return this.state;
     }
 
-    public long getConnectTimeout() {
-        return this.connectTimeout;
-    }
-
     public long getDeadline() {
         return this.deadline;
     }
@@ -95,17 +88,17 @@ class LeaseRequest<T, C, E extends PoolE
         }
     }
 
-    public void completed(final E result) {
+    public void completed(final PoolEntry<T, C> result) {
         if (this.completed.compareAndSet(false, true)) {
             this.result = result;
         }
     }
 
-    public BasicFuture<E> getFuture() {
+    public BasicFuture<PoolEntry<T, C>> getFuture() {
         return this.future;
     }
 
-    public E getResult() {
+    public PoolEntry<T, C> getResult() {
         return this.result;
     }
 

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/LeaseRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/LeaseRequest.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/LeaseRequest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntry.java?rev=1772441&r1=1772440&r2=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntry.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntry.java Sat Dec  3 09:47:13 2016
@@ -26,21 +26,21 @@
  */
 package org.apache.hc.core5.pool;
 
+import static java.lang.System.currentTimeMillis;
+
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.util.Args;
 
 /**
  * Pool entry containing a pool connection object along with its route.
  * <p>
- * The connection contained by the pool entry may have an expiration time which
- * can be either set upon construction time or updated with
- * the {@link #updateExpiry(long, TimeUnit)}.
- * <p>
- * Pool entry may also have an object associated with it that represents
- * a connection state (usually a security principal or a unique token identifying
+ * The connection assigned to this pool entry may have an expiration time and also have an object
+ * representing a connection state (usually a security principal or a unique token identifying
  * the user whose credentials have been used while establishing the connection).
  *
  * @param <T> the route type that represents the opposite endpoint of a pooled
@@ -48,62 +48,35 @@ import org.apache.hc.core5.util.Args;
  * @param <C> the connection type.
  * @since 4.2
  */
-@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
-public abstract class PoolEntry<T, C> {
+public final class PoolEntry<T, C extends Closeable> {
 
-    private final String id;
     private final T route;
-    private final C conn;
-    private final long created;
-    private final long validityDeadline;
-
-    private long updated;
-
-    private long expiry;
+    private final long timeToLive;
+    private final AtomicReference<C> connRef;
 
     private volatile Object state;
+    private volatile long created;
+    private volatile long updated;
+    private volatile long expiry;
+    private volatile long validityDeadline;
 
     /**
      * Creates new {@code PoolEntry} instance.
      *
-     * @param id unique identifier of the pool entry. May be {@code null}.
      * @param route route to the opposite endpoint.
-     * @param conn the connection.
      * @param timeToLive maximum time to live. May be zero if the connection
      *   does not have an expiry deadline.
-     * @param tunit time unit.
+     * @param timeUnit time unit.
      */
-    public PoolEntry(final String id, final T route, final C conn,
-            final long timeToLive, final TimeUnit tunit) {
+    public PoolEntry(final T route, final long timeToLive, final TimeUnit timeUnit) {
         super();
-        Args.notNull(route, "Route");
-        Args.notNull(conn, "Connection");
-        Args.notNull(tunit, "Time unit");
-        this.id = id;
-        this.route = route;
-        this.conn = conn;
-        this.created = System.currentTimeMillis();
-        if (timeToLive > 0) {
-            this.validityDeadline = this.created + tunit.toMillis(timeToLive);
-        } else {
-            this.validityDeadline = Long.MAX_VALUE;
-        }
-        this.expiry = this.validityDeadline;
+        this.route = Args.notNull(route, "Route");
+        this.connRef = new AtomicReference<>(null);
+        this.timeToLive = timeToLive > 0 ? (timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS).toMillis(timeToLive) : 0;
     }
 
-    /**
-     * Creates new {@code PoolEntry} instance without an expiry deadline.
-     *
-     * @param id unique identifier of the pool entry. May be {@code null}.
-     * @param route route to the opposite endpoint.
-     * @param conn the connection.
-     */
-    public PoolEntry(final String id, final T route, final C conn) {
-        this(id, route, conn, 0, TimeUnit.MILLISECONDS);
-    }
-
-    public String getId() {
-        return this.id;
+    public PoolEntry(final T route) {
+        this(route, 0, TimeUnit.MILLISECONDS);
     }
 
     public T getRoute() {
@@ -111,11 +84,7 @@ public abstract class PoolEntry<T, C> {
     }
 
     public C getConnection() {
-        return this.conn;
-    }
-
-    public long getCreated() {
-        return this.created;
+        return this.connRef.get();
     }
 
     /**
@@ -129,51 +98,85 @@ public abstract class PoolEntry<T, C> {
         return this.state;
     }
 
-    public void setState(final Object state) {
-        this.state = state;
-    }
-
-    public synchronized long getUpdated() {
+    public long getUpdated() {
         return this.updated;
     }
 
-    public synchronized long getExpiry() {
+    public long getExpiry() {
         return this.expiry;
     }
 
-    public synchronized void updateExpiry(final long time, final TimeUnit tunit) {
-        Args.notNull(tunit, "Time unit");
-        this.updated = System.currentTimeMillis();
-        final long newExpiry;
-        if (time > 0) {
-            newExpiry = this.updated + tunit.toMillis(time);
+    /**
+     * @since 5.0
+     */
+    public boolean hasConnection() {
+        return this.connRef.get() != null;
+    }
+
+    /**
+     * @since 5.0
+     */
+    public void assignConnection(final C conn) {
+        Args.notNull(conn, "connection");
+        if (this.connRef.compareAndSet(null, conn)) {
+            this.created = currentTimeMillis();
+            this.updated = this.created;
+            this.validityDeadline = this.timeToLive > 0 ? System.currentTimeMillis() + this.timeToLive : Long.MAX_VALUE;
+            this.expiry = this.validityDeadline;
         } else {
-            newExpiry = Long.MAX_VALUE;
+            throw new IllegalStateException("Connection already assigned");
         }
-        this.expiry = Math.min(newExpiry, this.validityDeadline);
     }
 
-    public synchronized boolean isExpired(final long now) {
-        return now >= this.expiry;
+    /**
+     * @since 5.0
+     */
+    public void discardConnection(final Callback<C> shutdownCallback) {
+        final C connection = this.connRef.getAndSet(null);
+        if (connection != null) {
+            if (shutdownCallback != null) {
+                shutdownCallback.execute(connection);
+            } else {
+                try {
+                    connection.close();
+                } catch (IOException ignore) {
+                }
+            }
+            this.state = null;
+            this.created = 0;
+            this.updated = 0;
+            this.expiry = 0;
+            this.validityDeadline = 0;
+        }
     }
 
     /**
-     * Invalidates the pool entry and closes the pooled connection associated
-     * with it.
+     * @since 5.0
      */
-    public abstract void close();
+    public void discardConnection() {
+        discardConnection(null);
+    }
 
     /**
-     * Returns {@code true} if the pool entry has been invalidated.
+     * @since 5.0
      */
-    public abstract boolean isClosed();
+    public void updateConnection(final long keepAlive, final TimeUnit timeUnit, final Object state) {
+        Args.notNull(timeUnit, "Time unit");
+        if (this.connRef.get() != null) {
+            this.state = state;
+            final long currentTime = System.currentTimeMillis();
+            final long newExpiry = keepAlive > 0 ? currentTime + timeUnit.toMillis(keepAlive) : Long.MAX_VALUE;
+            this.expiry = Math.min(newExpiry, getValidityDeadline());
+            this.updated = currentTime;
+        } else {
+            throw new IllegalStateException("Connection not assigned");
+        }
+    }
 
     @Override
     public String toString() {
         final StringBuilder buffer = new StringBuilder();
-        buffer.append("[id:");
-        buffer.append(this.id);
-        buffer.append("][route:");
+        buffer.append("[route:");
         buffer.append(this.route);
         buffer.append("][state:");
         buffer.append(this.state);

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/RoutePool.java (from r1772397, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/io/RouteSpecificPool.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/RoutePool.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/RoutePool.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/io/RouteSpecificPool.java&r1=1772397&r2=1772441&rev=1772441&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/io/RouteSpecificPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/RoutePool.java Sat Dec  3 09:47:13 2016
@@ -24,34 +24,31 @@
  * <http://www.apache.org/>.
  *
  */
-package org.apache.hc.core5.pool.io;
+package org.apache.hc.core5.pool;
 
+import java.io.Closeable;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.hc.core5.pool.PoolEntry;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
 
-abstract class RouteSpecificPool<T, C, E extends PoolEntry<T, C>> {
+final class RoutePool<T, C extends Closeable> {
 
     private final T route;
-    private final Set<E> leased;
-    private final LinkedList<E> available;
-    private final LinkedList<PoolEntryFuture<E>> pending;
+    private final Set<PoolEntry<T, C>> leased;
+    private final LinkedList<PoolEntry<T, C>> available;
 
-    RouteSpecificPool(final T route) {
+    RoutePool(final T route) {
         super();
         this.route = route;
         this.leased = new HashSet<>();
         this.available = new LinkedList<>();
-        this.pending = new LinkedList<>();
     }
 
-    protected abstract E createEntry(C conn);
-
     public final T getRoute() {
         return route;
     }
@@ -60,10 +57,6 @@ abstract class RouteSpecificPool<T, C, E
         return this.leased.size();
     }
 
-    public int getPendingCount() {
-        return this.pending.size();
-    }
-
     public int getAvailableCount() {
         return this.available.size();
     }
@@ -72,12 +65,12 @@ abstract class RouteSpecificPool<T, C, E
         return this.available.size() + this.leased.size();
     }
 
-    public E getFree(final Object state) {
+    public PoolEntry<T, C> getFree(final Object state) {
         if (!this.available.isEmpty()) {
             if (state != null) {
-                final Iterator<E> it = this.available.iterator();
+                final Iterator<PoolEntry<T, C>> it = this.available.iterator();
                 while (it.hasNext()) {
-                    final E entry = it.next();
+                    final PoolEntry<T, C> entry = it.next();
                     if (state.equals(entry.getState())) {
                         it.remove();
                         this.leased.add(entry);
@@ -85,9 +78,9 @@ abstract class RouteSpecificPool<T, C, E
                     }
                 }
             }
-            final Iterator<E> it = this.available.iterator();
+            final Iterator<PoolEntry<T, C>> it = this.available.iterator();
             while (it.hasNext()) {
-                final E entry = it.next();
+                final PoolEntry<T, C> entry = it.next();
                 if (entry.getState() == null) {
                     it.remove();
                     this.leased.add(entry);
@@ -98,14 +91,14 @@ abstract class RouteSpecificPool<T, C, E
         return null;
     }
 
-    public E getLastUsed() {
+    public PoolEntry<T, C> getLastUsed() {
         if (!this.available.isEmpty()) {
             return this.available.getLast();
         }
         return null;
     }
 
-    public boolean remove(final E entry) {
+    public boolean remove(final PoolEntry<T, C> entry) {
         Args.notNull(entry, "Pool entry");
         if (!this.available.remove(entry) && !this.leased.remove(entry)) {
             return false;
@@ -113,7 +106,7 @@ abstract class RouteSpecificPool<T, C, E
         return true;
     }
 
-    public void free(final E entry, final boolean reusable) {
+    public void free(final PoolEntry<T, C> entry, final boolean reusable) {
         Args.notNull(entry, "Pool entry");
         final boolean found = this.leased.remove(entry);
         Asserts.check(found, "Entry %s has not been leased from this pool", entry);
@@ -122,42 +115,19 @@ abstract class RouteSpecificPool<T, C, E
         }
     }
 
-    public E add(final C conn) {
-        final E entry = createEntry(conn);
+    public PoolEntry<T, C> createEntry(final long timeToLive, final TimeUnit timeUnit) {
+        final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, timeUnit);
         this.leased.add(entry);
         return entry;
     }
 
-    public void queue(final PoolEntryFuture<E> future) {
-        if (future == null) {
-            return;
-        }
-        this.pending.add(future);
-    }
-
-    public PoolEntryFuture<E> nextPending() {
-        return this.pending.poll();
-    }
-
-    public void unqueue(final PoolEntryFuture<E> future) {
-        if (future == null) {
-            return;
-        }
-
-        this.pending.remove(future);
-    }
-
     public void shutdown() {
-        for (final PoolEntryFuture<E> future: this.pending) {
-            future.cancel(true);
-        }
-        this.pending.clear();
-        for (final E entry: this.available) {
-            entry.close();
+        for (final PoolEntry<T, C> entry: this.available) {
+            entry.discardConnection();
         }
         this.available.clear();
-        for (final E entry: this.leased) {
-            entry.close();
+        for (final PoolEntry<T, C> entry: this.leased) {
+            entry.discardConnection();
         }
         this.leased.clear();
     }
@@ -171,8 +141,6 @@ abstract class RouteSpecificPool<T, C, E
         buffer.append(this.leased.size());
         buffer.append("][available: ");
         buffer.append(this.available.size());
-        buffer.append("][pending: ");
-        buffer.append(this.pending.size());
         buffer.append("]");
         return buffer.toString();
     }

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/RoutePool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/RoutePool.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/RoutePool.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message