hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject [2/6] httpcomponents-client git commit: * HTTP/2 multiplexing HttpAsyncClient implementation * Restructured integration tests to reduce test duplication
Date Mon, 13 Nov 2017 21:45:57 GMT
http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/6228a736/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
new file mode 100644
index 0000000..aab6f78
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
@@ -0,0 +1,308 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecCallback;
+import org.apache.hc.client5.http.async.AsyncExecChain;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.auth.AuthSchemeProvider;
+import org.apache.hc.client5.http.auth.CredentialsProvider;
+import org.apache.hc.client5.http.config.Configurable;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.cookie.CookieSpecProvider;
+import org.apache.hc.client5.http.cookie.CookieStore;
+import org.apache.hc.client5.http.impl.ExecSupport;
+import org.apache.hc.client5.http.impl.RequestCopier;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.config.Lookup;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
+
+abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
+
+    private final AsyncExecChainElement execChain;
+    private final Lookup<CookieSpecProvider> cookieSpecRegistry;
+    private final Lookup<AuthSchemeProvider> authSchemeRegistry;
+    private final CookieStore cookieStore;
+    private final CredentialsProvider credentialsProvider;
+    private final RequestConfig defaultConfig;
+    private final List<Closeable> closeables;
+
+    InternalAbstractHttpAsyncClient(
+            final DefaultConnectingIOReactor ioReactor,
+            final AsyncPushConsumerRegistry pushConsumerRegistry,
+            final ThreadFactory threadFactory,
+            final AsyncExecChainElement execChain,
+            final Lookup<CookieSpecProvider> cookieSpecRegistry,
+            final Lookup<AuthSchemeProvider> authSchemeRegistry,
+            final CookieStore cookieStore,
+            final CredentialsProvider credentialsProvider,
+            final RequestConfig defaultConfig,
+            final List<Closeable> closeables) {
+        super(ioReactor, pushConsumerRegistry, threadFactory);
+        this.execChain = execChain;
+        this.cookieSpecRegistry = cookieSpecRegistry;
+        this.authSchemeRegistry = authSchemeRegistry;
+        this.cookieStore = cookieStore;
+        this.credentialsProvider = credentialsProvider;
+        this.defaultConfig = defaultConfig;
+        this.closeables = closeables;
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        if (closeables != null) {
+            for (final Closeable closeable: closeables) {
+                try {
+                    closeable.close();
+                } catch (final IOException ex) {
+                    log.error(ex.getMessage(), ex);
+                }
+            }
+        }
+    }
+
+    private void setupContext(final HttpClientContext context) {
+        if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
+            context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
+        }
+        if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
+            context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
+        }
+        if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
+            context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
+        }
+        if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
+            context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
+        }
+        if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
+            context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
+        }
+    }
+
+    abstract AsyncExecRuntime crerateAsyncExecRuntime();
+
+    abstract HttpRoute determineRoute(HttpRequest request, HttpClientContext clientContext) throws HttpException;
+
+    @Override
+    public <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        ensureRunning();
+        final BasicFuture<T> future = new BasicFuture<>(callback);
+        try {
+            final HttpClientContext clientContext = HttpClientContext.adapt(context);
+            requestProducer.sendRequest(new RequestChannel() {
+
+                @Override
+                public void sendRequest(
+                        final HttpRequest request,
+                        final EntityDetails entityDetails) throws HttpException, IOException {
+
+                    RequestConfig requestConfig = null;
+                    if (request instanceof Configurable) {
+                        requestConfig = ((Configurable) request).getConfig();
+                    }
+                    if (requestConfig != null) {
+                        clientContext.setRequestConfig(requestConfig);
+                    }
+                    final HttpRoute route = determineRoute(request, clientContext);
+                    final String exchangeId = String.format("ex-%08X", ExecSupport.getNextExecNumber());
+                    final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime();
+                    if (log.isDebugEnabled()) {
+                        log.debug(exchangeId + ": preparing request execution");
+                    }
+
+                    setupContext(clientContext);
+
+                    final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
+                    final AtomicReference<T> resultRef = new AtomicReference<>(null);
+                    final AtomicBoolean outputTerminated = new AtomicBoolean(false);
+                    execChain.execute(
+                            RequestCopier.INSTANCE.copy(request),
+                            entityDetails != null ? new AsyncEntityProducer() {
+
+                                @Override
+                                public void releaseResources() {
+                                    requestProducer.releaseResources();
+                                }
+
+                                @Override
+                                public void failed(final Exception cause) {
+                                    requestProducer.failed(cause);
+                                }
+
+                                @Override
+                                public boolean isRepeatable() {
+                                    //TODO: use AsyncRequestProducer#isRepeatable once available
+                                    return requestProducer instanceof SimpleRequestProducer;
+                                }
+
+                                @Override
+                                public long getContentLength() {
+                                    return entityDetails.getContentLength();
+                                }
+
+                                @Override
+                                public String getContentType() {
+                                    return entityDetails.getContentType();
+                                }
+
+                                @Override
+                                public String getContentEncoding() {
+                                    return entityDetails.getContentEncoding();
+                                }
+
+                                @Override
+                                public boolean isChunked() {
+                                    return entityDetails.isChunked();
+                                }
+
+                                @Override
+                                public Set<String> getTrailerNames() {
+                                    return entityDetails.getTrailerNames();
+                                }
+
+                                @Override
+                                public int available() {
+                                    return requestProducer.available();
+                                }
+
+                                @Override
+                                public void produce(final DataStreamChannel channel) throws IOException {
+                                    if (outputTerminated.get()) {
+                                        channel.endStream();
+                                        return;
+                                    }
+                                    requestProducer.produce(channel);
+                                }
+
+                            } : null,
+                            scope,
+                            new AsyncExecCallback() {
+
+                                @Override
+                                public AsyncDataConsumer handleResponse(
+                                        final HttpResponse response,
+                                        final EntityDetails entityDetails) throws HttpException, IOException {
+                                    if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
+                                        outputTerminated.set(true);
+                                        requestProducer.releaseResources();
+                                    }
+                                    responseConsumer.consumeResponse(response, entityDetails,
+                                            //TODO: eliminate this callback after upgrade to HttpCore 5.0b2
+                                            new FutureCallback<T>() {
+
+                                                @Override
+                                                public void completed(final T result) {
+                                                    resultRef.set(result);
+                                                }
+
+                                                @Override
+                                                public void failed(final Exception ex) {
+                                                    future.failed(ex);
+                                                }
+
+                                                @Override
+                                                public void cancelled() {
+                                                    future.cancel();
+                                                }
+
+                                            });
+                                    return responseConsumer;
+                                }
+
+                                @Override
+                                public void completed() {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug(exchangeId + ": message exchange successfully completed");
+                                    }
+                                    try {
+                                        execRuntime.releaseConnection();
+                                        future.completed(resultRef.getAndSet(null));
+                                    } finally {
+                                        responseConsumer.releaseResources();
+                                        requestProducer.releaseResources();
+                                    }
+                                }
+
+                                @Override
+                                public void failed(final Exception cause) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug(exchangeId + ": request failed: " + cause.getMessage());
+                                    }
+                                    try {
+                                        execRuntime.discardConnection();
+                                        responseConsumer.failed(cause);
+                                    } finally {
+                                        try {
+                                            future.failed(cause);
+                                        } finally {
+                                            responseConsumer.releaseResources();
+                                            requestProducer.releaseResources();
+                                        }
+                                    }
+                                }
+
+                            });
+                }
+
+            });
+        } catch (final HttpException | IOException ex) {
+            future.failed(ex);
+        }
+        return future;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/6228a736/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
new file mode 100644
index 0000000..f68c69a
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncClient.java
@@ -0,0 +1,88 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.auth.AuthSchemeProvider;
+import org.apache.hc.client5.http.auth.CredentialsProvider;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.cookie.CookieSpecProvider;
+import org.apache.hc.client5.http.cookie.CookieStore;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.client5.http.routing.HttpRoutePlanner;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.config.Lookup;
+import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
+import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
+
+class InternalHttp2AsyncClient extends InternalAbstractHttpAsyncClient {
+
+    private final HttpRoutePlanner routePlanner;
+    private final H2ConnPool connPool;
+
+    InternalHttp2AsyncClient(
+            final DefaultConnectingIOReactor ioReactor,
+            final AsyncExecChainElement execChain,
+            final AsyncPushConsumerRegistry pushConsumerRegistry,
+            final ThreadFactory threadFactory,
+            final H2ConnPool connPool,
+            final HttpRoutePlanner routePlanner,
+            final Lookup<CookieSpecProvider> cookieSpecRegistry,
+            final Lookup<AuthSchemeProvider> authSchemeRegistry,
+            final CookieStore cookieStore,
+            final CredentialsProvider credentialsProvider,
+            final RequestConfig defaultConfig,
+            final List<Closeable> closeables) {
+        super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
+                cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, defaultConfig, closeables);
+        this.connPool = connPool;
+        this.routePlanner = routePlanner;
+    }
+
+    @Override
+    AsyncExecRuntime crerateAsyncExecRuntime() {
+        return new InternalHttp2AsyncExecRuntime(log, connPool);
+    }
+
+    @Override
+    HttpRoute determineRoute(final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
+        final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
+        final HttpRoute route = routePlanner.determineRoute(target, clientContext);
+        if (route.isTunnelled()) {
+            throw new HttpException("HTTP/2 tunneling not supported");
+        }
+        return route;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/6228a736/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
new file mode 100644
index 0000000..624dafe
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttp2AsyncExecRuntime.java
@@ -0,0 +1,245 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.ConnPoolSupport;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.command.ExecutionCommand;
+import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.logging.log4j.Logger;
+
+class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
+
+    private final Logger log;
+    private final H2ConnPool connPool;
+    private final AtomicReference<Endpoint> sessionRef;
+    private volatile boolean reusable;
+
+    InternalHttp2AsyncExecRuntime(final Logger log, final H2ConnPool connPool) {
+        super();
+        this.log = log;
+        this.connPool = connPool;
+        this.sessionRef = new AtomicReference<>(null);
+    }
+
+    @Override
+    public boolean isConnectionAcquired() {
+        return sessionRef.get() != null;
+    }
+
+    @Override
+    public void acquireConnection(
+            final HttpRoute route,
+            final Object object,
+            final HttpClientContext context,
+            final FutureCallback<AsyncExecRuntime> callback) {
+        if (sessionRef.get() == null) {
+            final HttpHost target = route.getTargetHost();
+            final RequestConfig requestConfig = context.getRequestConfig();
+            connPool.getSession(target, requestConfig.getConnectTimeout(), new FutureCallback<IOSession>() {
+
+                @Override
+                public void completed(final IOSession ioSession) {
+                    sessionRef.set(new Endpoint(target, ioSession));
+                    reusable = true;
+                    callback.completed(InternalHttp2AsyncExecRuntime.this);
+                }
+
+                @Override
+                public void failed(final Exception ex) {
+                    callback.failed(ex);
+                }
+
+                @Override
+                public void cancelled() {
+                    callback.cancelled();
+                }
+
+            });
+        } else {
+            callback.completed(this);
+        }
+    }
+
+    @Override
+    public void releaseConnection() {
+        final Endpoint endpoint = sessionRef.getAndSet(null);
+        if (endpoint != null && !reusable) {
+            endpoint.session.shutdown(ShutdownType.GRACEFUL);
+        }
+    }
+
+    @Override
+    public void discardConnection() {
+        final Endpoint endpoint = sessionRef.getAndSet(null);
+        if (endpoint != null) {
+            endpoint.session.shutdown(ShutdownType.GRACEFUL);
+        }
+    }
+
+    @Override
+    public boolean validateConnection() {
+        if (reusable) {
+            final Endpoint endpoint = sessionRef.get();
+            return endpoint != null && !endpoint.session.isClosed();
+        } else {
+            final Endpoint endpoint = sessionRef.getAndSet(null);
+            if (endpoint != null) {
+                endpoint.session.shutdown(ShutdownType.GRACEFUL);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isConnected() {
+        final Endpoint endpoint = sessionRef.get();
+        return endpoint != null && !endpoint.session.isClosed();
+    }
+
+
+    Endpoint ensureValid() {
+        final Endpoint endpoint = sessionRef.get();
+        if (endpoint == null) {
+            throw new IllegalStateException("I/O session not acquired / already released");
+        }
+        return endpoint;
+    }
+
+    @Override
+    public void connect(
+            final HttpClientContext context,
+            final FutureCallback<AsyncExecRuntime> callback) {
+        final Endpoint endpoint = ensureValid();
+        if (!endpoint.session.isClosed()) {
+            callback.completed(this);
+        } else {
+            final HttpHost target = endpoint.target;
+            final RequestConfig requestConfig = context.getRequestConfig();
+            connPool.getSession(target, requestConfig.getConnectTimeout(), new FutureCallback<IOSession>() {
+
+                @Override
+                public void completed(final IOSession ioSession) {
+                    sessionRef.set(new Endpoint(target, ioSession));
+                    reusable = true;
+                    callback.completed(InternalHttp2AsyncExecRuntime.this);
+                }
+
+                @Override
+                public void failed(final Exception ex) {
+                    callback.failed(ex);
+                }
+
+                @Override
+                public void cancelled() {
+                    callback.cancelled();
+                }
+
+            });
+        }
+
+    }
+
+    @Override
+    public void upgradeTls(final HttpClientContext context) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
+        final Endpoint endpoint = ensureValid();
+        final IOSession session = endpoint.session;
+        if (!session.isClosed()) {
+            if (log.isDebugEnabled()) {
+                log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
+            }
+            session.addLast(new ExecutionCommand(exchangeHandler, context));
+        } else {
+            final HttpHost target = endpoint.target;
+            final RequestConfig requestConfig = context.getRequestConfig();
+            connPool.getSession(target, requestConfig.getConnectTimeout(), new FutureCallback<IOSession>() {
+
+                @Override
+                public void completed(final IOSession ioSession) {
+                    sessionRef.set(new Endpoint(target, ioSession));
+                    reusable = true;
+                    if (log.isDebugEnabled()) {
+                        log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
+                    }
+                    session.addLast(new ExecutionCommand(exchangeHandler, context));
+                }
+
+                @Override
+                public void failed(final Exception ex) {
+                    exchangeHandler.failed(ex);
+                }
+
+                @Override
+                public void cancelled() {
+                    exchangeHandler.failed(new InterruptedIOException());
+                }
+
+            });
+        }
+
+    }
+
+    @Override
+    public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void markConnectionNonReusable() {
+        reusable = false;
+    }
+
+    static class Endpoint {
+
+        final HttpHost target;
+        final IOSession session;
+
+        Endpoint(final HttpHost target, final IOSession session) {
+            this.target = target;
+            this.session = session;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/6228a736/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
index c8f7e8e..8d23479 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
@@ -27,63 +27,33 @@
 package org.apache.hc.client5.http.impl.async;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.client5.http.HttpRoute;
-import org.apache.hc.client5.http.async.AsyncExecCallback;
-import org.apache.hc.client5.http.async.AsyncExecChain;
 import org.apache.hc.client5.http.async.AsyncExecRuntime;
-import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
 import org.apache.hc.client5.http.auth.AuthSchemeProvider;
 import org.apache.hc.client5.http.auth.CredentialsProvider;
-import org.apache.hc.client5.http.config.Configurable;
 import org.apache.hc.client5.http.config.RequestConfig;
 import org.apache.hc.client5.http.cookie.CookieSpecProvider;
 import org.apache.hc.client5.http.cookie.CookieStore;
-import org.apache.hc.client5.http.impl.ExecSupport;
-import org.apache.hc.client5.http.impl.RequestCopier;
 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
 import org.apache.hc.client5.http.routing.HttpRoutePlanner;
-import org.apache.hc.core5.concurrent.BasicFuture;
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpRequest;
-import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.HttpVersion;
 import org.apache.hc.core5.http.ProtocolVersion;
 import org.apache.hc.core5.http.config.Lookup;
-import org.apache.hc.core5.http.nio.AsyncDataConsumer;
-import org.apache.hc.core5.http.nio.AsyncEntityProducer;
-import org.apache.hc.core5.http.nio.AsyncRequestProducer;
-import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http.nio.RequestChannel;
-import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
 
-class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
+class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClient {
 
     private final AsyncClientConnectionManager connmgr;
-    private final AsyncExecChainElement execChain;
     private final HttpRoutePlanner routePlanner;
     private final HttpVersionPolicy versionPolicy;
-    private final Lookup<CookieSpecProvider> cookieSpecRegistry;
-    private final Lookup<AuthSchemeProvider> authSchemeRegistry;
-    private final CookieStore cookieStore;
-    private final CredentialsProvider credentialsProvider;
-    private final RequestConfig defaultConfig;
-    private final List<Closeable> closeables;
 
     InternalHttpAsyncClient(
             final DefaultConnectingIOReactor ioReactor,
@@ -99,227 +69,27 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
             final CredentialsProvider credentialsProvider,
             final RequestConfig defaultConfig,
             final List<Closeable> closeables) {
-        super(ioReactor, pushConsumerRegistry, threadFactory);
+        super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
+                cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, defaultConfig, closeables);
         this.connmgr = connmgr;
-        this.execChain = execChain;
         this.routePlanner = routePlanner;
         this.versionPolicy = versionPolicy;
-        this.cookieSpecRegistry = cookieSpecRegistry;
-        this.authSchemeRegistry = authSchemeRegistry;
-        this.cookieStore = cookieStore;
-        this.credentialsProvider = credentialsProvider;
-        this.defaultConfig = defaultConfig;
-        this.closeables = closeables;
     }
 
     @Override
-    public void close() {
-        super.close();
-        if (closeables != null) {
-            for (final Closeable closeable: closeables) {
-                try {
-                    closeable.close();
-                } catch (final IOException ex) {
-                    log.error(ex.getMessage(), ex);
-                }
-            }
-        }
-    }
-
-    private void setupContext(final HttpClientContext context) {
-        if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
-            context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
-        }
-        if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
-            context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
-        }
-        if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
-            context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
-        }
-        if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
-            context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
-        }
-        if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
-            context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
-        }
+    AsyncExecRuntime crerateAsyncExecRuntime() {
+        return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), versionPolicy);
     }
 
     @Override
-    public <T> Future<T> execute(
-            final AsyncRequestProducer requestProducer,
-            final AsyncResponseConsumer<T> responseConsumer,
-            final HttpContext context,
-            final FutureCallback<T> callback) {
-        ensureRunning();
-        final BasicFuture<T> future = new BasicFuture<>(callback);
-        try {
-            final HttpClientContext clientContext = HttpClientContext.adapt(context);
-            requestProducer.sendRequest(new RequestChannel() {
-
-                @Override
-                public void sendRequest(
-                        final HttpRequest request,
-                        final EntityDetails entityDetails) throws HttpException, IOException {
-
-                    RequestConfig requestConfig = null;
-                    if (request instanceof Configurable) {
-                        requestConfig = ((Configurable) request).getConfig();
-                    }
-                    if (requestConfig != null) {
-                        clientContext.setRequestConfig(requestConfig);
-                    }
-                    final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
-                    final HttpRoute route = routePlanner.determineRoute(target, clientContext);
-                    final String exchangeId = String.format("ex-%08X", ExecSupport.getNextExecNumber());
-                    final AsyncExecRuntime execRuntime = new AsyncExecRuntimeImpl(log, connmgr, getConnectionInitiator(), versionPolicy);
-                    if (log.isDebugEnabled()) {
-                        log.debug(exchangeId + ": preparing request execution");
-                    }
-
-                    final ProtocolVersion protocolVersion = clientContext.getProtocolVersion();
-                    if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
-                        throw new HttpException("HTTP/2 tunneling not supported");
-                    }
-
-                    setupContext(clientContext);
-
-                    final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
-                    final AtomicReference<T> resultRef = new AtomicReference<>(null);
-                    final AtomicBoolean outputTerminated = new AtomicBoolean(false);
-                    execChain.execute(
-                            RequestCopier.INSTANCE.copy(request),
-                            entityDetails != null ? new AsyncEntityProducer() {
-
-                                @Override
-                                public void releaseResources() {
-                                    requestProducer.releaseResources();
-                                }
-
-                                @Override
-                                public void failed(final Exception cause) {
-                                    requestProducer.failed(cause);
-                                }
-
-                                @Override
-                                public boolean isRepeatable() {
-                                    //TODO: use AsyncRequestProducer#isRepeatable once available
-                                    return requestProducer instanceof SimpleRequestProducer;
-                                }
-
-                                @Override
-                                public long getContentLength() {
-                                    return entityDetails.getContentLength();
-                                }
-
-                                @Override
-                                public String getContentType() {
-                                    return entityDetails.getContentType();
-                                }
-
-                                @Override
-                                public String getContentEncoding() {
-                                    return entityDetails.getContentEncoding();
-                                }
-
-                                @Override
-                                public boolean isChunked() {
-                                    return entityDetails.isChunked();
-                                }
-
-                                @Override
-                                public Set<String> getTrailerNames() {
-                                    return entityDetails.getTrailerNames();
-                                }
-
-                                @Override
-                                public int available() {
-                                    return requestProducer.available();
-                                }
-
-                                @Override
-                                public void produce(final DataStreamChannel channel) throws IOException {
-                                    if (outputTerminated.get()) {
-                                        channel.endStream();
-                                        return;
-                                    }
-                                    requestProducer.produce(channel);
-                                }
-
-                            } : null,
-                            scope,
-                            new AsyncExecCallback() {
-
-                                @Override
-                                public AsyncDataConsumer handleResponse(
-                                        final HttpResponse response,
-                                        final EntityDetails entityDetails) throws HttpException, IOException {
-                                    if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
-                                        outputTerminated.set(true);
-                                        requestProducer.releaseResources();
-                                    }
-                                    responseConsumer.consumeResponse(response, entityDetails,
-                                            //TODO: eliminate this callback after upgrade to HttpCore 5.0b2
-                                            new FutureCallback<T>() {
-
-                                                @Override
-                                                public void completed(final T result) {
-                                                    resultRef.set(result);
-                                                }
-
-                                                @Override
-                                                public void failed(final Exception ex) {
-                                                    future.failed(ex);
-                                                }
-
-                                                @Override
-                                                public void cancelled() {
-                                                    future.cancel();
-                                                }
-
-                                            });
-                                    return responseConsumer;
-                                }
-
-                                @Override
-                                public void completed() {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug(exchangeId + ": message exchange successfully completed");
-                                    }
-                                    try {
-                                        execRuntime.releaseConnection();
-                                        future.completed(resultRef.getAndSet(null));
-                                    } finally {
-                                        responseConsumer.releaseResources();
-                                        requestProducer.releaseResources();
-                                    }
-                                }
-
-                                @Override
-                                public void failed(final Exception cause) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug(exchangeId + ": request failed: " + cause.getMessage());
-                                    }
-                                    try {
-                                        execRuntime.discardConnection();
-                                        responseConsumer.failed(cause);
-                                    } finally {
-                                        try {
-                                            future.failed(cause);
-                                        } finally {
-                                            responseConsumer.releaseResources();
-                                            requestProducer.releaseResources();
-                                        }
-                                    }
-                                }
-
-                            });
-                }
-
-            });
-        } catch (final HttpException | IOException ex) {
-            future.failed(ex);
+    HttpRoute determineRoute(final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
+        final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
+        final HttpRoute route = routePlanner.determineRoute(target, clientContext);
+        final ProtocolVersion protocolVersion = clientContext.getProtocolVersion();
+        if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
+            throw new HttpException("HTTP/2 tunneling not supported");
         }
-        return future;
+        return route;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-client/blob/6228a736/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
----------------------------------------------------------------------
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
new file mode 100644
index 0000000..23ed986
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
@@ -0,0 +1,276 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.ConnPoolSupport;
+import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
+import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.logging.log4j.Logger;
+
+class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
+
+    private final Logger log;
+    private final AsyncClientConnectionManager manager;
+    private final ConnectionInitiator connectionInitiator;
+    private final HttpVersionPolicy versionPolicy;
+    private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
+    private volatile boolean reusable;
+    private volatile Object state;
+    private volatile TimeValue validDuration;
+
+    InternalHttpAsyncExecRuntime(
+            final Logger log,
+            final AsyncClientConnectionManager manager,
+            final ConnectionInitiator connectionInitiator,
+            final HttpVersionPolicy versionPolicy) {
+        super();
+        this.log = log;
+        this.manager = manager;
+        this.connectionInitiator = connectionInitiator;
+        this.versionPolicy = versionPolicy;
+        this.endpointRef = new AtomicReference<>(null);
+        this.validDuration = TimeValue.NEG_ONE_MILLISECONDS;
+    }
+
+    @Override
+    public boolean isConnectionAcquired() {
+        return endpointRef.get() != null;
+    }
+
+    @Override
+    public void acquireConnection(
+            final HttpRoute route,
+            final Object object,
+            final HttpClientContext context,
+            final FutureCallback<AsyncExecRuntime> callback) {
+        if (endpointRef.get() == null) {
+            state = object;
+            final RequestConfig requestConfig = context.getRequestConfig();
+            manager.lease(route, object, requestConfig.getConnectionRequestTimeout(), new FutureCallback<AsyncConnectionEndpoint>() {
+
+                @Override
+                public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
+                    endpointRef.set(connectionEndpoint);
+                    reusable = connectionEndpoint.isConnected();
+                    callback.completed(InternalHttpAsyncExecRuntime.this);
+                }
+
+                @Override
+                public void failed(final Exception ex) {
+                    callback.failed(ex);
+                }
+
+                @Override
+                public void cancelled() {
+                    callback.cancelled();
+                }
+            });
+        } else {
+            callback.completed(this);
+        }
+    }
+
+    private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
+        try {
+            endpoint.shutdown();
+            if (log.isDebugEnabled()) {
+                log.debug(ConnPoolSupport.getId(endpoint) + ": discarding endpoint");
+            }
+        } catch (final IOException ex) {
+            if (log.isDebugEnabled()) {
+                log.debug(ConnPoolSupport.getId(endpoint) + ": " + ex.getMessage(), ex);
+            }
+        } finally {
+            manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void releaseConnection() {
+        final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
+        if (endpoint != null) {
+            if (reusable) {
+                if (log.isDebugEnabled()) {
+                    log.debug(ConnPoolSupport.getId(endpoint) + ": releasing valid endpoint");
+                }
+                manager.release(endpoint, state, validDuration);
+            } else {
+                discardEndpoint(endpoint);
+            }
+        }
+    }
+
+    @Override
+    public void discardConnection() {
+        final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
+        if (endpoint != null) {
+            discardEndpoint(endpoint);
+        }
+    }
+
+    @Override
+    public boolean validateConnection() {
+        if (reusable) {
+            final AsyncConnectionEndpoint endpoint = endpointRef.get();
+            return endpoint != null && endpoint.isConnected();
+        } else {
+            final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
+            if (endpoint != null) {
+                discardEndpoint(endpoint);
+            }
+        }
+        return false;
+    }
+
+    AsyncConnectionEndpoint ensureValid() {
+        final AsyncConnectionEndpoint endpoint = endpointRef.get();
+        if (endpoint == null) {
+            throw new IllegalStateException("Endpoint not acquired / already released");
+        }
+        return endpoint;
+    }
+
+    @Override
+    public boolean isConnected() {
+        final AsyncConnectionEndpoint endpoint = endpointRef.get();
+        return endpoint != null && endpoint.isConnected();
+    }
+
+    @Override
+    public void connect(
+            final HttpClientContext context,
+            final FutureCallback<AsyncExecRuntime> callback) {
+        final AsyncConnectionEndpoint endpoint = ensureValid();
+        if (endpoint.isConnected()) {
+            callback.completed(this);
+        } else {
+            final RequestConfig requestConfig = context.getRequestConfig();
+            manager.connect(
+                    endpoint,
+                    connectionInitiator,
+                    requestConfig.getConnectTimeout(),
+                    versionPolicy,
+                    context,
+                    new FutureCallback<AsyncConnectionEndpoint>() {
+
+                        @Override
+                        public void completed(final AsyncConnectionEndpoint endpoint) {
+                            final TimeValue socketTimeout = requestConfig.getSocketTimeout();
+                            if (TimeValue.isPositive(socketTimeout)) {
+                                endpoint.setSocketTimeout(socketTimeout.toMillisIntBound());
+                            }
+                            callback.completed(InternalHttpAsyncExecRuntime.this);
+                        }
+
+                        @Override
+                        public void failed(final Exception ex) {
+                            callback.failed(ex);
+                        }
+
+                        @Override
+                        public void cancelled() {
+                            callback.cancelled();
+                        }
+
+            });
+        }
+
+    }
+
+    @Override
+    public void upgradeTls(final HttpClientContext context) {
+        final AsyncConnectionEndpoint endpoint = ensureValid();
+        manager.upgrade(endpoint, versionPolicy, context);
+    }
+
+    @Override
+    public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
+        final AsyncConnectionEndpoint endpoint = ensureValid();
+        if (endpoint.isConnected()) {
+            if (log.isDebugEnabled()) {
+                log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
+            }
+            endpoint.execute(exchangeHandler, context);
+        } else {
+            connect(context, new FutureCallback<AsyncExecRuntime>() {
+
+                @Override
+                public void completed(final AsyncExecRuntime runtime) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
+                    }
+                    try {
+                        endpoint.execute(exchangeHandler, context);
+                    } catch (final RuntimeException ex) {
+                        failed(ex);
+                    }
+                }
+
+                @Override
+                public void failed(final Exception ex) {
+                    exchangeHandler.failed(ex);
+                }
+
+                @Override
+                public void cancelled() {
+                    exchangeHandler.failed(new InterruptedIOException());
+                }
+
+            });
+        }
+
+    }
+
+    @Override
+    public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
+        reusable = true;
+        state = newState;
+        validDuration = newValidDuration;
+    }
+
+    @Override
+    public void markConnectionNonReusable() {
+        reusable = false;
+        state = null;
+        validDuration = null;
+    }
+
+}


Mime
View raw message