hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1793320 [4/5] - in /httpcomponents/httpclient/trunk: httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ httpclient5-testing/src/main/java/org/apache/hc/client5/testing/classic/ httpclient5-testing/src/test/java/org/apac...
Date Mon, 01 May 2017 12:39:18 GMT
Added: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java?rev=1793320&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java (added)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java Mon May  1 12:39:16 2017
@@ -0,0 +1,126 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecCallback;
+import org.apache.hc.client5.http.async.AsyncExecChain;
+import org.apache.hc.client5.http.async.AsyncExecChainHandler;
+import org.apache.hc.client5.http.impl.ExecSupport;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.client5.http.sync.HttpRequestRetryHandler;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.util.Args;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+class AsyncRetryExec implements AsyncExecChainHandler {
+
+    private final Logger log = LogManager.getLogger(getClass());
+
+    private final HttpRequestRetryHandler retryHandler;
+
+    public AsyncRetryExec(final HttpRequestRetryHandler retryHandler) {
+        Args.notNull(retryHandler, "HTTP request retry handler");
+        this.retryHandler = retryHandler;
+    }
+
+    private void internalExecute(
+            final int execCount,
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecChain chain,
+            final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+
+        chain.proceed(ExecSupport.copy(request), entityProducer, scope, new AsyncExecCallback() {
+
+            @Override
+            public AsyncDataConsumer handleResponse(
+                    final HttpResponse response,
+                    final EntityDetails entityDetails) throws HttpException, IOException {
+                return asyncExecCallback.handleResponse(response, entityDetails);
+            }
+
+            @Override
+            public void completed() {
+                asyncExecCallback.completed();
+            }
+
+            @Override
+            public void failed(final Exception cause) {
+                if (cause instanceof IOException) {
+                    final HttpRoute route = scope.route;
+                    final HttpClientContext clientContext = scope.clientContext;
+                    if (retryHandler.retryRequest(request, (IOException) cause, execCount, clientContext)) {
+                        if (log.isInfoEnabled()) {
+                            log.info("I/O exception ("+ cause.getClass().getName() +
+                                    ") caught when processing request to "
+                                    + route +
+                                    ": "
+                                    + cause.getMessage());
+                        }
+                        if (log.isDebugEnabled()) {
+                            log.debug(cause.getMessage(), cause);
+                        }
+                        if (log.isInfoEnabled()) {
+                            log.info("Retrying request to " + route);
+                        }
+                        try {
+                            scope.execRuntime.discardConnection();
+                            internalExecute(execCount + 1, request, entityProducer, scope, chain, asyncExecCallback);
+                            return;
+                        } catch (IOException | HttpException ex) {
+                            log.error(ex.getMessage(), ex);
+                        }
+                    }
+                }
+                asyncExecCallback.failed(cause);
+            }
+
+        });
+
+    }
+
+    @Override
+    public void execute(
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecChain chain,
+            final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+        internalExecute(1, request, entityProducer, scope, chain, asyncExecCallback);
+    }
+
+}

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java?rev=1793320&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java (added)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java Mon May  1 12:39:16 2017
@@ -0,0 +1,94 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.nio.AsyncDataProducer;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+
+public class BasicAsyncEntityProducer implements AsyncEntityProducer {
+
+    private final AsyncDataProducer dataProducer;
+    private final EntityDetails entityDetails;
+
+    BasicAsyncEntityProducer(final AsyncDataProducer dataProducer, final EntityDetails entityDetails) {
+        this.dataProducer = Args.notNull(dataProducer, "Data producer");
+        this.entityDetails = Args.notNull(entityDetails, "Entity details");
+    }
+
+    @Override
+    public void releaseResources() {
+        dataProducer.releaseResources();
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        dataProducer.releaseResources();
+    }
+
+    @Override
+    public long getContentLength() {
+        return entityDetails.getContentLength();
+    }
+
+    @Override
+    public String getContentType() {
+        return entityDetails.getContentType();
+    }
+
+    @Override
+    public String getContentEncoding() {
+        return entityDetails.getContentEncoding();
+    }
+
+    @Override
+    public boolean isChunked() {
+        return entityDetails.isChunked();
+    }
+
+    @Override
+    public Set<String> getTrailerNames() {
+        return entityDetails.getTrailerNames();
+    }
+
+    @Override
+    public int available() {
+        return dataProducer.available();
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        dataProducer.produce(channel);
+    }
+
+}
\ No newline at end of file

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java Mon May  1 12:39:16 2017
@@ -31,16 +31,20 @@ import java.util.List;
 import java.util.concurrent.Future;
 
 import org.apache.hc.client5.http.async.HttpAsyncClient;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Supplier;
-import org.apache.hc.core5.http.HttpHost;
-import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.reactor.ExceptionEvent;
 import org.apache.hc.core5.reactor.IOReactorStatus;
@@ -66,12 +70,6 @@ public abstract class CloseableHttpAsync
 
     public abstract void shutdown(ShutdownType shutdownType);
 
-    public final Future<AsyncClientEndpoint> lease(
-            final HttpHost host,
-            final FutureCallback<AsyncClientEndpoint> callback) {
-        return lease(host, HttpClientContext.create(), callback);
-    }
-
     public final <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
@@ -83,4 +81,36 @@ public abstract class CloseableHttpAsync
         register(null, uriPattern, supplier);
     }
 
+    public final Future<SimpleHttpResponse> execute(
+            final SimpleHttpRequest request,
+            final HttpContext context,
+            final FutureCallback<SimpleHttpResponse> callback) {
+        final BasicFuture<SimpleHttpResponse> future = new BasicFuture<>(callback);
+        execute(new SimpleRequestProducer(request), new SimpleResponseConsumer(), context, new FutureCallback<SimpleHttpResponse>() {
+
+            @Override
+            public void completed(final SimpleHttpResponse response) {
+                future.completed(response);
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                future.failed(ex);
+            }
+
+            @Override
+            public void cancelled() {
+                future.cancel(true);
+            }
+
+        });
+        return future;
+    }
+
+    public final Future<SimpleHttpResponse> execute(
+            final SimpleHttpRequest request,
+            final FutureCallback<SimpleHttpResponse> callback) {
+        return execute(request, HttpClientContext.create(), callback);
+    }
+
 }

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java Mon May  1 12:39:16 2017
@@ -37,22 +37,31 @@ import java.util.List;
 
 import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
 import org.apache.hc.client5.http.SchemePortResolver;
+import org.apache.hc.client5.http.async.AsyncExecChainHandler;
 import org.apache.hc.client5.http.config.RequestConfig;
 import org.apache.hc.client5.http.impl.DefaultConnectionKeepAliveStrategy;
 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
 import org.apache.hc.client5.http.impl.DefaultUserTokenHandler;
 import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
+import org.apache.hc.client5.http.impl.NamedElementChain;
 import org.apache.hc.client5.http.impl.NoopUserTokenHandler;
 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.client5.http.impl.protocol.DefaultRedirectStrategy;
 import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
 import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner;
 import org.apache.hc.client5.http.impl.routing.SystemDefaultRoutePlanner;
+import org.apache.hc.client5.http.impl.sync.ChainElements;
+import org.apache.hc.client5.http.impl.sync.DefaultHttpRequestRetryHandler;
 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
+import org.apache.hc.client5.http.protocol.RedirectStrategy;
 import org.apache.hc.client5.http.protocol.RequestDefaultHeaders;
 import org.apache.hc.client5.http.protocol.RequestExpectContinue;
 import org.apache.hc.client5.http.protocol.UserTokenHandler;
 import org.apache.hc.client5.http.routing.HttpRoutePlanner;
+import org.apache.hc.client5.http.sync.HttpRequestRetryHandler;
+import org.apache.hc.core5.annotation.Internal;
 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.ConnectionReuseStrategy;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
@@ -66,8 +75,8 @@ import org.apache.hc.core5.http.config.H
 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.http.protocol.HttpProcessor;
 import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
 import org.apache.hc.core5.http.protocol.RequestUserAgent;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
@@ -75,10 +84,14 @@ import org.apache.hc.core5.http2.config.
 import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
 import org.apache.hc.core5.http2.protocol.H2RequestContent;
 import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
+import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.pool.ConnPoolControl;
+import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOReactorException;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.TimeValue;
 import org.apache.hc.core5.util.VersionInfo;
 
@@ -108,6 +121,54 @@ import org.apache.hc.core5.util.VersionI
  */
 public class HttpAsyncClientBuilder {
 
+    private static class RequestInterceptorEntry {
+
+        enum Postion { FIRST, LAST }
+
+        final RequestInterceptorEntry.Postion postion;
+        final HttpRequestInterceptor interceptor;
+
+        private RequestInterceptorEntry(final RequestInterceptorEntry.Postion postion, final HttpRequestInterceptor interceptor) {
+            this.postion = postion;
+            this.interceptor = interceptor;
+        }
+    }
+
+    private static class ResponseInterceptorEntry {
+
+        enum Postion { FIRST, LAST }
+
+        final ResponseInterceptorEntry.Postion postion;
+        final HttpResponseInterceptor interceptor;
+
+        private ResponseInterceptorEntry(final ResponseInterceptorEntry.Postion postion, final HttpResponseInterceptor interceptor) {
+            this.postion = postion;
+            this.interceptor = interceptor;
+        }
+    }
+
+    private static class ExecInterceptorEntry {
+
+        enum Postion { BEFORE, AFTER, REPLACE }
+
+        final ExecInterceptorEntry.Postion postion;
+        final String name;
+        final AsyncExecChainHandler interceptor;
+        final String existing;
+
+        private ExecInterceptorEntry(
+                final ExecInterceptorEntry.Postion postion,
+                final String name,
+                final AsyncExecChainHandler interceptor,
+                final String existing) {
+            this.postion = postion;
+            this.name = name;
+            this.interceptor = interceptor;
+            this.existing = existing;
+        }
+
+    }
+
     private HttpVersionPolicy versionPolicy;
     private AsyncClientConnectionManager connManager;
     private boolean connManagerShared;
@@ -116,16 +177,19 @@ public class HttpAsyncClientBuilder {
     private H2Config h2Config;
     private CharCodingConfig charCodingConfig;
     private SchemePortResolver schemePortResolver;
-    private ConnectionReuseStrategy reuseStrategy;
     private ConnectionKeepAliveStrategy keepAliveStrategy;
     private UserTokenHandler userTokenHandler;
 
-    private LinkedList<HttpRequestInterceptor> requestFirst;
-    private LinkedList<HttpRequestInterceptor> requestLast;
-    private LinkedList<HttpResponseInterceptor> responseFirst;
-    private LinkedList<HttpResponseInterceptor> responseLast;
+    private LinkedList<RequestInterceptorEntry> requestInterceptors;
+    private LinkedList<ResponseInterceptorEntry> responseInterceptors;
+    private LinkedList<ExecInterceptorEntry> execInterceptors;
 
     private HttpRoutePlanner routePlanner;
+    private RedirectStrategy redirectStrategy;
+    private HttpRequestRetryHandler retryHandler;
+
+    private ConnectionReuseStrategy reuseStrategy;
+
     private String userAgent;
     private HttpHost proxy;
     private Collection<? extends Header> defaultHeaders;
@@ -135,6 +199,8 @@ public class HttpAsyncClientBuilder {
     private TimeValue maxIdleTime;
 
     private boolean systemProperties;
+    private boolean automaticRetriesDisabled;
+    private boolean redirectHandlingDisabled;
     private boolean connectionStateDisabled;
 
     private List<Closeable> closeables;
@@ -213,6 +279,8 @@ public class HttpAsyncClientBuilder {
 
     /**
      * Assigns {@link ConnectionReuseStrategy} instance.
+     * <p>
+     * Please note this strategy applies to HTTP/1.0 and HTTP/1.1 connections only
      */
     public final HttpAsyncClientBuilder setConnectionReuseStrategy(final ConnectionReuseStrategy reuseStrategy) {
         this.reuseStrategy = reuseStrategy;
@@ -240,90 +308,138 @@ public class HttpAsyncClientBuilder {
     }
 
     /**
-     * Disables connection state tracking.
+     * Adds this protocol interceptor to the head of the protocol processing list.
      */
-    public final HttpAsyncClientBuilder disableConnectionState() {
-        connectionStateDisabled = true;
+    public final HttpAsyncClientBuilder addRequestInterceptorFirst(final HttpResponseInterceptor interceptor) {
+        Args.notNull(interceptor, "Interceptor");
+        if (responseInterceptors == null) {
+            responseInterceptors = new LinkedList<>();
+        }
+        responseInterceptors.add(new ResponseInterceptorEntry(ResponseInterceptorEntry.Postion.FIRST, interceptor));
         return this;
     }
 
     /**
-     * Assigns {@link SchemePortResolver} instance.
+     * Adds this protocol interceptor to the tail of the protocol processing list.
      */
-    public final HttpAsyncClientBuilder setSchemePortResolver(final SchemePortResolver schemePortResolver) {
-        this.schemePortResolver = schemePortResolver;
+    public final HttpAsyncClientBuilder addResponseInterceptorLast(final HttpResponseInterceptor interceptor) {
+        Args.notNull(interceptor, "Interceptor");
+        if (responseInterceptors == null) {
+            responseInterceptors = new LinkedList<>();
+        }
+        responseInterceptors.add(new ResponseInterceptorEntry(ResponseInterceptorEntry.Postion.LAST, interceptor));
         return this;
     }
 
     /**
-     * Assigns {@code User-Agent} value.
+     * Adds this execution interceptor before an existing interceptor.
      */
-    public final HttpAsyncClientBuilder setUserAgent(final String userAgent) {
-        this.userAgent = userAgent;
+    public final HttpAsyncClientBuilder addExecInterceptorBefore(final String existing, final String name, final AsyncExecChainHandler interceptor) {
+        Args.notBlank(existing, "Existing");
+        Args.notBlank(name, "Name");
+        Args.notNull(interceptor, "Interceptor");
+        if (execInterceptors == null) {
+            execInterceptors = new LinkedList<>();
+        }
+        execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.BEFORE, name, interceptor, existing));
         return this;
     }
 
     /**
-     * Assigns default request header values.
+     * Adds this execution interceptor after interceptor with the given name.
      */
-    public final HttpAsyncClientBuilder setDefaultHeaders(final Collection<? extends Header> defaultHeaders) {
-        this.defaultHeaders = defaultHeaders;
+    public final HttpAsyncClientBuilder addExecInterceptorAfter(final String existing, final String name, final AsyncExecChainHandler interceptor) {
+        Args.notBlank(existing, "Existing");
+        Args.notBlank(name, "Name");
+        Args.notNull(interceptor, "Interceptor");
+        if (execInterceptors == null) {
+            execInterceptors = new LinkedList<>();
+        }
+        execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.AFTER, name, interceptor, existing));
         return this;
     }
 
     /**
-     * Adds this protocol interceptor to the head of the protocol processing list.
+     * Replace an existing interceptor with the given name with new interceptor.
      */
-    public final HttpAsyncClientBuilder addInterceptorFirst(final HttpResponseInterceptor itcp) {
-        if (itcp == null) {
-            return this;
+    public final HttpAsyncClientBuilder replaceExecInterceptor(final String existing, final AsyncExecChainHandler interceptor) {
+        Args.notBlank(existing, "Existing");
+        Args.notNull(interceptor, "Interceptor");
+        if (execInterceptors == null) {
+            execInterceptors = new LinkedList<>();
         }
-        if (responseFirst == null) {
-            responseFirst = new LinkedList<>();
+        execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.REPLACE, existing, interceptor, existing));
+        return this;
+    }
+
+    /**
+     * Adds this protocol interceptor to the head of the protocol processing list.
+     */
+    public final HttpAsyncClientBuilder addRequestInterceptorFirst(final HttpRequestInterceptor interceptor) {
+        Args.notNull(interceptor, "Interceptor");
+        if (requestInterceptors == null) {
+            requestInterceptors = new LinkedList<>();
         }
-        responseFirst.addFirst(itcp);
+        requestInterceptors.add(new RequestInterceptorEntry(RequestInterceptorEntry.Postion.FIRST, interceptor));
         return this;
     }
 
     /**
      * Adds this protocol interceptor to the tail of the protocol processing list.
      */
-    public final HttpAsyncClientBuilder addInterceptorLast(final HttpResponseInterceptor itcp) {
-        if (itcp == null) {
-            return this;
-        }
-        if (responseLast == null) {
-            responseLast = new LinkedList<>();
+    public final HttpAsyncClientBuilder addResponseInterceptorLast(final HttpRequestInterceptor interceptor) {
+        Args.notNull(interceptor, "Interceptor");
+        if (requestInterceptors == null) {
+            requestInterceptors = new LinkedList<>();
         }
-        responseLast.addLast(itcp);
+        requestInterceptors.add(new RequestInterceptorEntry(RequestInterceptorEntry.Postion.LAST, interceptor));
         return this;
     }
 
     /**
-     * Adds this protocol interceptor to the head of the protocol processing list.
+     * Assigns {@link HttpRequestRetryHandler} instance.
+     * <p>
+     * Please note this value can be overridden by the {@link #disableAutomaticRetries()}
+     * method.
      */
-    public final HttpAsyncClientBuilder addInterceptorFirst(final HttpRequestInterceptor itcp) {
-        if (itcp == null) {
-            return this;
-        }
-        if (requestFirst == null) {
-            requestFirst = new LinkedList<>();
-        }
-        requestFirst.addFirst(itcp);
+    public final HttpAsyncClientBuilder setRetryHandler(final HttpRequestRetryHandler retryHandler) {
+        this.retryHandler = retryHandler;
         return this;
     }
 
     /**
-     * Adds this protocol interceptor to the tail of the protocol processing list.
+     * Assigns {@link RedirectStrategy} instance.
+     * <p>
+     * Please note this value can be overridden by the {@link #disableRedirectHandling()}
+     * method.
+     * </p>
      */
-    public final HttpAsyncClientBuilder addInterceptorLast(final HttpRequestInterceptor itcp) {
-        if (itcp == null) {
-            return this;
-        }
-        if (requestLast == null) {
-            requestLast = new LinkedList<>();
-        }
-        requestLast.addLast(itcp);
+    public HttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy) {
+        this.redirectStrategy = redirectStrategy;
+        return this;
+    }
+
+    /**
+     * Assigns {@link SchemePortResolver} instance.
+     */
+    public final HttpAsyncClientBuilder setSchemePortResolver(final SchemePortResolver schemePortResolver) {
+        this.schemePortResolver = schemePortResolver;
+        return this;
+    }
+
+    /**
+     * Assigns {@code User-Agent} value.
+     */
+    public final HttpAsyncClientBuilder setUserAgent(final String userAgent) {
+        this.userAgent = userAgent;
+        return this;
+    }
+
+    /**
+     * Assigns default request header values.
+     */
+    public final HttpAsyncClientBuilder setDefaultHeaders(final Collection<? extends Header> defaultHeaders) {
+        this.defaultHeaders = defaultHeaders;
         return this;
     }
 
@@ -366,6 +482,30 @@ public class HttpAsyncClientBuilder {
     }
 
     /**
+     * Disables connection state tracking.
+     */
+    public final HttpAsyncClientBuilder disableConnectionState() {
+        connectionStateDisabled = true;
+        return this;
+    }
+
+    /**
+     * Disables automatic redirect handling.
+     */
+    public final HttpAsyncClientBuilder disableRedirectHandling() {
+        redirectHandlingDisabled = true;
+        return this;
+    }
+
+    /**
+     * Disables automatic request recovery and re-execution.
+     */
+    public final HttpAsyncClientBuilder disableAutomaticRetries() {
+        automaticRetriesDisabled = true;
+        return this;
+    }
+
+    /**
      * Makes this instance of HttpClient proactively evict expired connections from the
      * connection pool using a background thread.
      * <p>
@@ -407,8 +547,20 @@ public class HttpAsyncClientBuilder {
     }
 
     /**
+     * Request exec chain customization and extension.
+     * <p>
      * For internal use.
      */
+    @Internal
+    protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler> execChainDefinition) {
+    }
+
+    /**
+     * Adds to the list of {@link Closeable} resources to be managed by the client.
+     * <p>
+     * For internal use.
+     */
+    @Internal
     protected void addCloseable(final Closeable closeable) {
         if (closeable == null) {
             return;
@@ -439,6 +591,11 @@ public class HttpAsyncClientBuilder {
             }
         }
 
+        final NamedElementChain<AsyncExecChainHandler> execChainDefinition = new NamedElementChain<>();
+        execChainDefinition.addLast(
+                new AsyncMainClientExec(keepAliveStrategyCopy, userTokenHandlerCopy),
+                ChainElements.MAIN_TRANSPORT.name());
+
         String userAgentCopy = this.userAgent;
         if (userAgentCopy == null) {
             if (systemProperties) {
@@ -451,14 +608,18 @@ public class HttpAsyncClientBuilder {
         }
 
         final HttpProcessorBuilder b = HttpProcessorBuilder.create();
-        if (requestFirst != null) {
-            for (final HttpRequestInterceptor i: requestFirst) {
-                b.addFirst(i);
+        if (requestInterceptors != null) {
+            for (final RequestInterceptorEntry entry: requestInterceptors) {
+                if (entry.postion == RequestInterceptorEntry.Postion.FIRST) {
+                    b.addFirst(entry.interceptor);
+                }
             }
         }
-        if (responseFirst != null) {
-            for (final HttpResponseInterceptor i: responseFirst) {
-                b.addFirst(i);
+        if (responseInterceptors != null) {
+            for (final ResponseInterceptorEntry entry: responseInterceptors) {
+                if (entry.postion == ResponseInterceptorEntry.Postion.FIRST) {
+                    b.addFirst(entry.interceptor);
+                }
             }
         }
         b.addAll(
@@ -468,17 +629,35 @@ public class HttpAsyncClientBuilder {
                 new H2RequestConnControl(),
                 new RequestUserAgent(userAgentCopy),
                 new RequestExpectContinue());
-        if (requestLast != null) {
-            for (final HttpRequestInterceptor i: requestLast) {
-                b.addLast(i);
+        if (requestInterceptors != null) {
+            for (final RequestInterceptorEntry entry: requestInterceptors) {
+                if (entry.postion == RequestInterceptorEntry.Postion.LAST) {
+                    b.addFirst(entry.interceptor);
+                }
             }
         }
-        if (responseLast != null) {
-            for (final HttpResponseInterceptor i: responseLast) {
-                b.addLast(i);
+        if (responseInterceptors != null) {
+            for (final ResponseInterceptorEntry entry: responseInterceptors) {
+                if (entry.postion == ResponseInterceptorEntry.Postion.LAST) {
+                    b.addFirst(entry.interceptor);
+                }
             }
         }
-        final HttpProcessor httpProcessor = b.build();
+
+        execChainDefinition.addFirst(
+                new AsyncProtocolExec(b.build()),
+                ChainElements.PROTOCOL.name());
+
+        // Add request retry executor, if not disabled
+        if (!automaticRetriesDisabled) {
+            HttpRequestRetryHandler retryHandlerCopy = this.retryHandler;
+            if (retryHandlerCopy == null) {
+                retryHandlerCopy = DefaultHttpRequestRetryHandler.INSTANCE;
+            }
+            execChainDefinition.addFirst(
+                    new AsyncRetryExec(retryHandlerCopy),
+                    ChainElements.RETRY_IO_ERROR.name());
+        }
 
         HttpRoutePlanner routePlannerCopy = this.routePlanner;
         if (routePlannerCopy == null) {
@@ -495,6 +674,18 @@ public class HttpAsyncClientBuilder {
                 routePlannerCopy = new DefaultRoutePlanner(schemePortResolverCopy);
             }
         }
+
+        // Add redirect executor, if not disabled
+        if (!redirectHandlingDisabled) {
+            RedirectStrategy redirectStrategyCopy = this.redirectStrategy;
+            if (redirectStrategyCopy == null) {
+                redirectStrategyCopy = DefaultRedirectStrategy.INSTANCE;
+            }
+            execChainDefinition.addFirst(
+                    new AsyncRedirectExec(routePlannerCopy, redirectStrategyCopy),
+                    ChainElements.REDIRECT.name());
+        }
+
         List<Closeable> closeablesCopy = closeables != null ? new ArrayList<>(closeables) : null;
         if (!this.connManagerShared) {
             if (closeablesCopy == null) {
@@ -538,7 +729,7 @@ public class HttpAsyncClientBuilder {
         }
         final AsyncPushConsumerRegistry pushConsumerRegistry = new AsyncPushConsumerRegistry();
         final IOEventHandlerFactory ioEventHandlerFactory = new HttpAsyncClientEventHandlerFactory(
-                httpProcessor,
+                NoopHttpProcessor.INSTANCE,
                 new HandlerFactory<AsyncPushConsumer>() {
 
                     @Override
@@ -552,22 +743,56 @@ public class HttpAsyncClientBuilder {
                 h1Config != null ? h1Config : H1Config.DEFAULT,
                 charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
                 reuseStrategyCopy);
+        final DefaultConnectingIOReactor ioReactor;
         try {
-            return new InternalHttpAsyncClient(
+            ioReactor = new DefaultConnectingIOReactor(
                     ioEventHandlerFactory,
-                    pushConsumerRegistry,
                     ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
-                    new DefaultThreadFactory("httpclient-main", true),
                     new DefaultThreadFactory("httpclient-dispatch", true),
-                    connManagerCopy,
-                    routePlannerCopy,
-                    keepAliveStrategyCopy,
-                    userTokenHandlerCopy,
-                    defaultRequestConfig,
-                    closeablesCopy);
+                    new Callback<IOSession>() {
+
+                        @Override
+                        public void execute(final IOSession ioSession) {
+                            ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+                        }
+
+                    });
         } catch (final IOReactorException ex) {
             throw new IllegalStateException(ex.getMessage(), ex);
         }
+
+        if (execInterceptors != null) {
+            for (final ExecInterceptorEntry entry: execInterceptors) {
+                switch (entry.postion) {
+                    case AFTER:
+                        execChainDefinition.addAfter(entry.existing, entry.interceptor, entry.name);
+                        break;
+                    case BEFORE:
+                        execChainDefinition.addBefore(entry.existing, entry.interceptor, entry.name);
+                        break;
+                }
+            }
+        }
+
+        customizeExecChain(execChainDefinition);
+
+        NamedElementChain<AsyncExecChainHandler>.Node current = execChainDefinition.getLast();
+        AsyncExecChainElement execChain = null;
+        while (current != null) {
+            execChain = new AsyncExecChainElement(current.getValue(), execChain);
+            current = current.getPrevious();
+        }
+
+        return new InternalHttpAsyncClient(
+                ioReactor,
+                execChain,
+                pushConsumerRegistry,
+                new DefaultThreadFactory("httpclient-main", true),
+                connManagerCopy,
+                routePlannerCopy,
+                versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
+                defaultRequestConfig,
+                closeablesCopy);
     }
 
 }

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientEventHandlerFactory.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientEventHandlerFactory.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientEventHandlerFactory.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientEventHandlerFactory.java Mon May  1 12:39:16 2017
@@ -33,7 +33,6 @@ import java.util.List;
 
 import org.apache.hc.client5.http.impl.ConnPoolSupport;
 import org.apache.hc.client5.http.impl.logging.LogAppendable;
-import org.apache.hc.client5.http.impl.logging.LoggingIOEventHandler;
 import org.apache.hc.client5.http.impl.logging.LoggingIOSession;
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
@@ -78,7 +77,7 @@ import org.apache.logging.log4j.Logger;
 @Contract(threading = ThreadingBehavior.IMMUTABLE)
 public class HttpAsyncClientEventHandlerFactory implements IOEventHandlerFactory {
 
-    private final Logger streamLog = LogManager.getLogger(ClientHttpProtocolNegotiator.class);
+    private final Logger streamLog = LogManager.getLogger(InternalHttpAsyncClient.class);
     private final Logger wireLog = LogManager.getLogger("org.apache.hc.client5.http.wire");
     private final Logger headerLog = LogManager.getLogger("org.apache.hc.client5.http.headers");
     private final Logger frameLog = LogManager.getLogger("org.apache.hc.client5.http2.frame");
@@ -282,15 +281,12 @@ public class HttpAsyncClientEventHandler
 
                     });
             final LoggingIOSession loggingIOSession = new LoggingIOSession(ioSession, id, sessionLog, wireLog);
-            return new LoggingIOEventHandler(
-                    new ClientHttpProtocolNegotiator(
+            return new ClientHttpProtocolNegotiator(
                             loggingIOSession,
                             http1StreamHandlerFactory,
                             http2StreamHandlerFactory,
                             attachment instanceof HttpVersionPolicy ? (HttpVersionPolicy) attachment : versionPolicy,
-                            connectionListener),
-                    id,
-                    streamLog);
+                            connectionListener);
         } else {
             final ClientHttp1StreamDuplexerFactory http1StreamHandlerFactory = new ClientHttp1StreamDuplexerFactory(
                     httpProcessor,

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java Mon May  1 12:39:16 2017
@@ -66,6 +66,13 @@ public class HttpAsyncClients {
     }
 
     /**
+     * Creates {@link CloseableHttpAsyncClient} instance with default configuration.
+     */
+    public static CloseableHttpAsyncClient createDefault() {
+        return HttpAsyncClientBuilder.create().build();
+    }
+
+    /**
      * Creates {@link CloseableHttpAsyncClient} instance with default
      * configuration and system properties.
      */
@@ -85,12 +92,15 @@ public class HttpAsyncClients {
     private static MinimalHttpAsyncClient createMinimalImpl(
             final IOEventHandlerFactory eventHandlerFactory,
             final AsyncPushConsumerRegistry pushConsumerRegistry,
+            final HttpVersionPolicy versionPolicy,
+            final IOReactorConfig ioReactorConfig,
             final AsyncClientConnectionManager connmgr) {
         try {
             return new MinimalHttpAsyncClient(
                     eventHandlerFactory,
                     pushConsumerRegistry,
-                    IOReactorConfig.DEFAULT,
+                    versionPolicy,
+                    ioReactorConfig,
                     new DefaultThreadFactory("httpclient-main", true),
                     new DefaultThreadFactory("httpclient-dispatch", true),
                     connmgr);
@@ -99,10 +109,15 @@ public class HttpAsyncClients {
         }
     }
 
+    /**
+     * Creates {@link MinimalHttpAsyncClient} instance that provides
+     * essential HTTP/1.1 and HTTP/2 message transport only.
+     */
     public static MinimalHttpAsyncClient createMinimal(
             final HttpVersionPolicy versionPolicy,
             final H2Config h2Config,
             final H1Config h1Config,
+            final IOReactorConfig ioReactorConfig,
             final AsyncClientConnectionManager connmgr) {
         return createMinimalImpl(
                 new HttpAsyncClientEventHandlerFactory(
@@ -114,30 +129,80 @@ public class HttpAsyncClients {
                         CharCodingConfig.DEFAULT,
                         DefaultConnectionReuseStrategy.INSTANCE),
                 new AsyncPushConsumerRegistry(),
+                versionPolicy,
+                ioReactorConfig,
                 connmgr);
     }
 
     /**
-     * Creates {@link CloseableHttpAsyncClient} instance that provides
+     * Creates {@link MinimalHttpAsyncClient} instance that provides
+     * essential HTTP/1.1 and HTTP/2 message transport only.
+     */
+    public static MinimalHttpAsyncClient createMinimal(
+            final HttpVersionPolicy versionPolicy,
+            final H2Config h2Config,
+            final H1Config h1Config,
+            final IOReactorConfig ioReactorConfig) {
+        return createMinimal(versionPolicy, h2Config, h1Config, ioReactorConfig,
+                PoolingAsyncClientConnectionManagerBuilder.create().build());
+    }
+
+    /**
+     * Creates {@link MinimalHttpAsyncClient} instance that provides
+     * essential HTTP/1.1 and HTTP/2 message transport only.
+     */
+    public static MinimalHttpAsyncClient createMinimal(
+            final HttpVersionPolicy versionPolicy,
+            final H2Config h2Config,
+            final H1Config h1Config) {
+        return createMinimal(versionPolicy, h2Config, h1Config, IOReactorConfig.DEFAULT);
+    }
+
+    /**
+     * Creates {@link MinimalHttpAsyncClient} instance that provides
      * essential HTTP/1.1 and HTTP/2 message transport only.
      */
-    public static CloseableHttpAsyncClient createMinimal() {
+    public static MinimalHttpAsyncClient createMinimal() {
         return createMinimal(
                 HttpVersionPolicy.NEGOTIATE,
                 H2Config.DEFAULT,
+                H1Config.DEFAULT);
+    }
+
+    /**
+     * Creates {@link MinimalHttpAsyncClient} instance that provides
+     * essential HTTP/1.1 transport only.
+     */
+    public static MinimalHttpAsyncClient createMinimal(final H1Config h1Config, final IOReactorConfig ioReactorConfig) {
+        return createMinimal(
+                HttpVersionPolicy.FORCE_HTTP_1,
+                H2Config.DEFAULT,
+                h1Config,
+                ioReactorConfig);
+    }
+
+    /**
+     * Creates {@link MinimalHttpAsyncClient} instance that provides
+     * essential HTTP/2 transport only.
+     */
+    public static MinimalHttpAsyncClient createMinimal(final H2Config h2Config, final IOReactorConfig ioReactorConfig) {
+        return createMinimal(
+                HttpVersionPolicy.FORCE_HTTP_2,
+                h2Config,
                 H1Config.DEFAULT,
-                PoolingAsyncClientConnectionManagerBuilder.create().build());
+                ioReactorConfig);
     }
 
     /**
-     * Creates {@link CloseableHttpAsyncClient} instance that provides
+     * Creates {@link MinimalHttpAsyncClient} instance that provides
      * essential HTTP/1.1 and HTTP/2 message transport only.
      */
-    public static CloseableHttpAsyncClient createMinimal(final AsyncClientConnectionManager connManager) {
+    public static MinimalHttpAsyncClient createMinimal(final AsyncClientConnectionManager connManager) {
         return createMinimal(
                 HttpVersionPolicy.NEGOTIATE,
                 H2Config.DEFAULT,
                 H1Config.DEFAULT,
+                IOReactorConfig.DEFAULT,
                 connManager);
     }
 

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java Mon May  1 12:39:16 2017
@@ -28,77 +28,61 @@ package org.apache.hc.client5.http.impl.
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
 import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecCallback;
+import org.apache.hc.client5.http.async.AsyncExecChain;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
 import org.apache.hc.client5.http.config.Configurable;
 import org.apache.hc.client5.http.config.RequestConfig;
-import org.apache.hc.client5.http.impl.ConnPoolSupport;
+import org.apache.hc.client5.http.impl.ExecSupport;
 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
-import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
-import org.apache.hc.client5.http.protocol.UserTokenHandler;
 import org.apache.hc.client5.http.routing.HttpRoutePlanner;
 import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.message.RequestLine;
-import org.apache.hc.core5.http.message.StatusLine;
-import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
 import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
 import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.reactor.IOEventHandlerFactory;
-import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
-import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
-import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
 
 class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
 
-    private final static AtomicLong COUNT = new AtomicLong(0);
-
     private final AsyncClientConnectionManager connmgr;
+    private final AsyncExecChainElement execChain;
     private final HttpRoutePlanner routePlanner;
-    private final ConnectionKeepAliveStrategy keepAliveStrategy;
-    private final UserTokenHandler userTokenHandler;
+    private final HttpVersionPolicy versionPolicy;
     private final RequestConfig defaultConfig;
     private final List<Closeable> closeables;
 
     InternalHttpAsyncClient(
-            final IOEventHandlerFactory eventHandlerFactory,
+            final DefaultConnectingIOReactor ioReactor,
+            final AsyncExecChainElement execChain,
             final AsyncPushConsumerRegistry pushConsumerRegistry,
-            final IOReactorConfig reactorConfig,
             final ThreadFactory threadFactory,
-            final ThreadFactory workerThreadFactory,
             final AsyncClientConnectionManager connmgr,
             final HttpRoutePlanner routePlanner,
-            final ConnectionKeepAliveStrategy keepAliveStrategy,
-            final UserTokenHandler userTokenHandler,
+            final HttpVersionPolicy versionPolicy,
             final RequestConfig defaultConfig,
-            final List<Closeable> closeables) throws IOReactorException {
-        super(eventHandlerFactory, pushConsumerRegistry, reactorConfig, threadFactory, workerThreadFactory);
+            final List<Closeable> closeables) {
+        super(ioReactor, pushConsumerRegistry, threadFactory);
         this.connmgr = connmgr;
+        this.execChain = execChain;
         this.routePlanner = routePlanner;
-        this.keepAliveStrategy = keepAliveStrategy;
-        this.userTokenHandler = userTokenHandler;
+        this.versionPolicy = versionPolicy;
         this.defaultConfig = defaultConfig;
         this.closeables = closeables;
     }
@@ -117,97 +101,72 @@ class InternalHttpAsyncClient extends Ab
         }
     }
 
-    private void leaseEndpoint(
+    private void setupContext(final HttpClientContext context) {
+        if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
+            context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
+        }
+    }
+
+    private void executeChain(
+            final String exchangeId,
+            final AsyncExecChainElement execChain,
             final HttpRoute route,
-            final Object userToken,
+            final HttpRequest request,
+            final EntityDetails entityDetails,
+            final AsyncClientExchangeHandler exchangeHandler,
             final HttpClientContext clientContext,
-            final FutureCallback<AsyncConnectionEndpoint> callback) {
-        final RequestConfig requestConfig = clientContext.getRequestConfig();
-        connmgr.lease(route, userToken, requestConfig.getConnectTimeout(),
-                new FutureCallback<AsyncConnectionEndpoint>() {
+            final AsyncExecRuntime execRuntime) throws IOException, HttpException {
 
-                    @Override
-                    public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
-                        if (connectionEndpoint.isConnected()) {
-                            callback.completed(connectionEndpoint);
-                        } else {
-                            connmgr.connect(
-                                    connectionEndpoint,
-                                    getConnectionInitiator(),
-                                    requestConfig.getConnectTimeout(),
-                                    clientContext,
-                                    new FutureCallback<AsyncConnectionEndpoint>() {
-
-                                        @Override
-                                        public void completed(final AsyncConnectionEndpoint result) {
-                                            callback.completed(result);
-                                        }
-
-                                        @Override
-                                        public void failed(final Exception ex) {
-                                            callback.failed(ex);
-                                        }
-
-                                        @Override
-                                        public void cancelled() {
-                                            callback.cancelled();
-                                        }
+        if (log.isDebugEnabled()) {
+            log.debug(exchangeId + ": preparing request execution");
+        }
 
-                                    });
-                        }
+        setupContext(clientContext);
+
+        final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
+        execChain.execute(
+                ExecSupport.copy(request),
+                entityDetails != null ? new BasicAsyncEntityProducer(exchangeHandler, entityDetails) : null,
+                scope,
+                new AsyncExecCallback() {
+
+                    @Override
+                    public AsyncDataConsumer handleResponse(
+                            final HttpResponse response,
+                            final EntityDetails entityDetails) throws HttpException, IOException {
+                        exchangeHandler.consumeResponse(response, entityDetails);
+                        return exchangeHandler;
                     }
 
                     @Override
-                    public void failed(final Exception ex) {
-                        callback.failed(ex);
+                    public void completed() {
+                        if (log.isDebugEnabled()) {
+                            log.debug(exchangeId + ": message exchange successfully completed");
+                        }
+                        try {
+                            exchangeHandler.releaseResources();
+                        } finally {
+                            execRuntime.releaseConnection();
+                        }
                     }
 
                     @Override
-                    public void cancelled() {
-                        callback.cancelled();
+                    public void failed(final Exception cause) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(exchangeId + ": request failed: " + cause.getMessage());
+                        }
+                        try {
+                            exchangeHandler.failed(cause);
+                            exchangeHandler.releaseResources();
+                        } finally {
+                            execRuntime.discardConnection();
+                        }
                     }
 
                 });
     }
 
     @Override
-    public Future<AsyncClientEndpoint> lease(
-            final HttpHost host,
-            final HttpContext context,
-            final FutureCallback<AsyncClientEndpoint> callback) {
-        Args.notNull(host, "Host");
-        Args.notNull(context, "HTTP context");
-        ensureRunning();
-        final HttpClientContext clientContext = HttpClientContext.adapt(context);
-        final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
-        try {
-            final HttpRoute route = routePlanner.determineRoute(host, clientContext);
-            final Object userToken = clientContext.getUserToken();
-            leaseEndpoint(route, userToken, clientContext, new FutureCallback<AsyncConnectionEndpoint>() {
-
-                @Override
-                public void completed(final AsyncConnectionEndpoint result) {
-                    future.completed(new InternalAsyncClientEndpoint(route, result));
-                }
-
-                @Override
-                public void failed(final Exception ex) {
-                    future.failed(ex);
-                }
-
-                @Override
-                public void cancelled() {
-                    future.cancel(true);
-                }
-
-            });
-        } catch (final HttpException ex) {
-            future.failed(ex);
-        }
-        return future;
-    }
-
-    @Override
     public <T> Future<T> execute(
             final AsyncRequestProducer requestProducer,
             final AsyncResponseConsumer<T> responseConsumer,
@@ -217,7 +176,7 @@ class InternalHttpAsyncClient extends Ab
         final BasicFuture<T> future = new BasicFuture<>(callback);
         try {
             final HttpClientContext clientContext = HttpClientContext.adapt(context);
-            final HttpRequest request = requestProducer.produceRequest();
+
             RequestConfig requestConfig = null;
             if (requestProducer instanceof Configurable) {
                 requestConfig = ((Configurable) requestProducer).getConfig();
@@ -225,36 +184,12 @@ class InternalHttpAsyncClient extends Ab
             if (requestConfig != null) {
                 clientContext.setRequestConfig(requestConfig);
             }
-            final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
-            final HttpRoute route = routePlanner.determineRoute(target, clientContext);
-            final Object userToken = clientContext.getUserToken();
-            leaseEndpoint(route, userToken, clientContext, new FutureCallback<AsyncConnectionEndpoint>() {
-
-                @Override
-                public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
-                    final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(route, connectionEndpoint);
-                    endpoint.execute(requestProducer, responseConsumer, clientContext, new FutureCallback<T>() {
-
-                        @Override
-                        public void completed(final T result) {
-                            endpoint.releaseAndReuse();
-                            future.completed(result);
-                        }
 
-                        @Override
-                        public void failed(final Exception ex) {
-                            endpoint.releaseAndDiscard();
-                            future.failed(ex);
-                        }
-
-                        @Override
-                        public void cancelled() {
-                            endpoint.releaseAndDiscard();
-                            future.cancel();
-                        }
-
-                    });
+            final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
 
+                @Override
+                public void completed(final T result) {
+                    future.completed(result);
                 }
 
                 @Override
@@ -268,185 +203,26 @@ class InternalHttpAsyncClient extends Ab
                 }
 
             });
-        } catch (final HttpException ex) {
-            future.failed(ex);
-        }
-        return future;
-    }
-
-    private void setupContext(final HttpClientContext context) {
-        if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
-            context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
-        }
-    }
-
-    private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
-
-        private final HttpRoute route;
-        private final AsyncConnectionEndpoint connectionEndpoint;
-        private final AtomicBoolean reusable;
-        private final AtomicReference<Object> userTokenRef;
-        private final AtomicReference<TimeValue> keepAliveRef;
-        private final AtomicBoolean released;
-
-        InternalAsyncClientEndpoint(final HttpRoute route, final AsyncConnectionEndpoint connectionEndpoint) {
-            this.route = route;
-            this.connectionEndpoint = connectionEndpoint;
-            this.reusable = new AtomicBoolean(true);
-            this.keepAliveRef = new AtomicReference<>(TimeValue.NEG_ONE_MILLISECONDS);
-            this.userTokenRef = new AtomicReference<>(null);
-            this.released = new AtomicBoolean(false);
-        }
-
-        @Override
-        public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
-            Asserts.check(!released.get(), ConnPoolSupport.getId(connectionEndpoint) + " endpoint has already been released");
-
-            final HttpClientContext clientContext = HttpClientContext.adapt(context);
-            setupContext(clientContext);
-
-            connectionEndpoint.execute(new AsyncClientExchangeHandler() {
-
-                private final String id = Long.toString(COUNT.incrementAndGet());
-
-                void updateState() {
-                    reusable.set(true);
-                    Object userToken = clientContext.getUserToken();
-                    if (userToken == null) {
-                        userToken = userTokenHandler.getUserToken(route, context);
-                        context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
-                    }
-                    userTokenRef.set(userToken);
-                }
-
-                @Override
-                public void produceRequest(
-                        final RequestChannel channel) throws HttpException, IOException {
-                    exchangeHandler.produceRequest(log.isDebugEnabled() ? new RequestChannel() {
-
-                        @Override
-                        public void sendRequest(
-                                final HttpRequest request,
-                                final EntityDetails entityDetails) throws HttpException, IOException {
-                            if (log.isDebugEnabled()) {
-                                log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id  + ": request " + new RequestLine(request));
-                            }
-                            channel.sendRequest(request, entityDetails);
-                        }
-
-                    } : channel);
-                }
-
-                @Override
-                public int available() {
-                    return exchangeHandler.available();
-                }
-
-                @Override
-                public void produce(final DataStreamChannel channel) throws IOException {
-                    exchangeHandler.produce(channel);
-                }
+            exchangeHandler.produceRequest(new RequestChannel() {
 
                 @Override
-                public void consumeResponse(
-                        final HttpResponse response,
+                public void sendRequest(
+                        final HttpRequest request,
                         final EntityDetails entityDetails) throws HttpException, IOException {
-                    if (log.isDebugEnabled()) {
-                        log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id  + ": response " + new StatusLine(response));
-                    }
-                    exchangeHandler.consumeResponse(response, entityDetails);
-
-                    keepAliveRef.set(keepAliveStrategy.getKeepAliveDuration(response, context));
-
-                    if (entityDetails == null) {
-                        updateState();
-                        if (log.isDebugEnabled()) {
-                            log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id  + ": completed");
-                        }
-                    }
-                }
-
-                @Override
-                public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
-                    if (log.isDebugEnabled()) {
-                        log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id  + ": intermediate response " + new StatusLine(response));
-                    }
-                    exchangeHandler.consumeInformation(response);
-                }
 
-                @Override
-                public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-                    exchangeHandler.updateCapacity(capacityChannel);
+                    final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
+                    final HttpRoute route = routePlanner.determineRoute(target, clientContext);
+                    final String exchangeId = "ex-" + Long.toHexString(ExecSupport.getNextExecNumber());
+                    final AsyncExecRuntime execRuntime = new AsyncExecRuntimeImpl(log, connmgr, getConnectionInitiator(), versionPolicy);
+                    executeChain(exchangeId, execChain, route, request, entityDetails, exchangeHandler, clientContext, execRuntime);
                 }
 
-                @Override
-                public int consume(final ByteBuffer src) throws IOException {
-                    return exchangeHandler.consume(src);
-                }
-
-                @Override
-                public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-                    if (log.isDebugEnabled()) {
-                        log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id  + ": completed");
-                    }
-                    exchangeHandler.streamEnd(trailers);
-                    updateState();
-                }
-
-                @Override
-                public void failed(final Exception cause) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id  + ": failed", cause);
-                    }
-                    reusable.set(false);
-                    exchangeHandler.failed(cause);
-                }
-
-                @Override
-                public void cancel() {
-                    if (log.isDebugEnabled()) {
-                        log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id  + ": cancelled");
-                    }
-                    reusable.set(false);
-                    exchangeHandler.cancel();
-                }
-
-                @Override
-                public void releaseResources() {
-                    exchangeHandler.releaseResources();
-                }
-
-            }, clientContext);
-        }
-
-        private void closeEndpoint() {
-            try {
-                connectionEndpoint.close();
-            } catch (final IOException ex) {
-                log.debug("I/O error closing connection endpoint: " + ex.getMessage(), ex);
-            }
-        }
-
-        @Override
-        public void releaseAndReuse() {
-            if (released.compareAndSet(false, true)) {
-                if (!reusable.get()) {
-                    closeEndpoint();
-                    connmgr.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECONDS);
-                } else {
-                    connmgr.release(connectionEndpoint, userTokenRef.get(), keepAliveRef.get());
-                }
-            }
-        }
+            });
 
-        @Override
-        public void releaseAndDiscard() {
-            if (released.compareAndSet(false, true)) {
-                closeEndpoint();
-                connmgr.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECONDS);
-            }
+        } catch (HttpException | IOException ex) {
+            future.failed(ex);
         }
-
+        return future;
     }
 
 }

Added: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java?rev=1793320&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java (added)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java Mon May  1 12:39:16 2017
@@ -0,0 +1,198 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.util.Identifiable;
+import org.apache.logging.log4j.Logger;
+
+final class LoggingAsyncClientExchangeHandler implements AsyncClientExchangeHandler, Identifiable {
+
+    private final Logger log;
+    private final String exchangeId;
+    private final AsyncClientExchangeHandler handler;
+
+    LoggingAsyncClientExchangeHandler(final Logger log, final String exchangeId, final AsyncClientExchangeHandler handler) {
+        this.log = log;
+        this.exchangeId = exchangeId;
+        this.handler = handler;
+    }
+
+    @Override
+    public String getId() {
+        return exchangeId;
+    }
+
+    @Override
+    public void releaseResources() {
+        handler.releaseResources();
+    }
+
+    @Override
+    public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
+        handler.produceRequest(new RequestChannel() {
+
+            @Override
+            public void sendRequest(
+                    final HttpRequest request,
+                    final EntityDetails entityDetails) throws HttpException, IOException {
+                if (log.isDebugEnabled()) {
+                    log.debug(exchangeId + ": send request " + new RequestLine(request) + ", " +
+                            (entityDetails != null ? "entity len " + entityDetails.getContentLength() : "null entity"));
+                }
+                channel.sendRequest(request, entityDetails);
+            }
+
+        });
+    }
+
+    @Override
+    public int available() {
+        return handler.available();
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(exchangeId + ": produce request data");
+        }
+        handler.produce(new DataStreamChannel() {
+
+            @Override
+            public void requestOutput() {
+                channel.requestOutput();
+            }
+
+            @Override
+            public int write(final ByteBuffer src) throws IOException {
+                if (log.isDebugEnabled()) {
+                    log.debug(exchangeId + ": produce request data, len " + src.remaining() + " bytes");
+                }
+                return channel.write(src);
+            }
+
+            @Override
+            public void endStream() throws IOException {
+                if (log.isDebugEnabled()) {
+                    log.debug(exchangeId + ": end of request data");
+                }
+                channel.endStream();
+            }
+
+            @Override
+            public void endStream(final List<? extends Header> trailers) throws IOException {
+                if (log.isDebugEnabled()) {
+                    log.debug(exchangeId + ": end of request data");
+                }
+                channel.endStream(trailers);
+            }
+
+        });
+    }
+
+    @Override
+    public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(exchangeId + ": information response " + new StatusLine(response));
+        }
+        handler.consumeInformation(response);
+    }
+
+    @Override
+    public void consumeResponse(
+            final HttpResponse response,
+            final EntityDetails entityDetails) throws HttpException, IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(exchangeId + ": consume response " + new StatusLine(response) + ", " +
+                    (entityDetails != null ? "entity len " + entityDetails.getContentLength() : " null entity"));
+        }
+        handler.consumeResponse(response, entityDetails);
+    }
+
+
+    @Override
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        handler.updateCapacity(new CapacityChannel() {
+
+            @Override
+            public void update(final int increment) throws IOException {
+                if (log.isDebugEnabled()) {
+                    log.debug(exchangeId + ": capacity update " + increment);
+                }
+                capacityChannel.update(increment);
+            }
+
+        });
+    }
+
+    @Override
+    public int consume(final ByteBuffer src) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(exchangeId + ": consume response data, len " + src.remaining() + " bytes");
+        }
+        return handler.consume(src);
+    }
+
+    @Override
+    public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(exchangeId + ": end of response data");
+        }
+        handler.streamEnd(trailers);
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        if (log.isDebugEnabled()) {
+            log.debug(exchangeId + ": execution failed: " + cause.getMessage());
+        }
+        handler.failed(cause);
+    }
+
+    @Override
+    public void cancel() {
+        if (log.isDebugEnabled()) {
+            log.debug(exchangeId + ": execution cancelled");
+        }
+        handler.cancel();
+    }
+
+}

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java Mon May  1 12:39:16 2017
@@ -34,6 +34,8 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.hc.client5.http.HttpRoute;
 import org.apache.hc.client5.http.config.Configurable;
 import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.ConnPoolSupport;
+import org.apache.hc.client5.http.impl.ExecSupport;
 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -41,32 +43,54 @@ import org.apache.hc.core5.concurrent.Ba
 import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.concurrent.ComplexFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOReactorException;
+import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
 import org.apache.hc.core5.util.TimeValue;
 
-class MinimalHttpAsyncClient extends AbstractHttpAsyncClientBase {
+public class MinimalHttpAsyncClient extends AbstractHttpAsyncClientBase {
 
     private final AsyncClientConnectionManager connmgr;
+    private final HttpVersionPolicy versionPolicy;
 
-    public MinimalHttpAsyncClient(
+    MinimalHttpAsyncClient(
             final IOEventHandlerFactory eventHandlerFactory,
             final AsyncPushConsumerRegistry pushConsumerRegistry,
+            final HttpVersionPolicy versionPolicy,
             final IOReactorConfig reactorConfig,
             final ThreadFactory threadFactory,
             final ThreadFactory workerThreadFactory,
             final AsyncClientConnectionManager connmgr) throws IOReactorException {
-        super(eventHandlerFactory, pushConsumerRegistry, reactorConfig, threadFactory, workerThreadFactory);
+        super(new DefaultConnectingIOReactor(
+                eventHandlerFactory,
+                reactorConfig,
+                workerThreadFactory,
+                new Callback<IOSession>() {
+
+                    @Override
+                    public void execute(final IOSession ioSession) {
+                        ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+                    }
+
+                }),
+                pushConsumerRegistry,
+                threadFactory);
+        this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
         this.connmgr = connmgr;
     }
 
@@ -77,8 +101,7 @@ class MinimalHttpAsyncClient extends Abs
             final FutureCallback<AsyncConnectionEndpoint> callback) {
         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
         final Future<AsyncConnectionEndpoint> leaseFuture = connmgr.lease(
-                new HttpRoute(host), null,
-                connectTimeout,
+                new HttpRoute(host), null, connectTimeout,
                 new FutureCallback<AsyncConnectionEndpoint>() {
 
                     @Override
@@ -90,6 +113,7 @@ class MinimalHttpAsyncClient extends Abs
                                     connectionEndpoint,
                                     getConnectionInitiator(),
                                     connectTimeout,
+                                    versionPolicy,
                                     clientContext,
                                     new FutureCallback<AsyncConnectionEndpoint>() {
 
@@ -128,7 +152,12 @@ class MinimalHttpAsyncClient extends Abs
         return resultFuture;
     }
 
-    @Override
+    public final Future<AsyncClientEndpoint> lease(
+            final HttpHost host,
+            final FutureCallback<AsyncClientEndpoint> callback) {
+        return lease(host, HttpClientContext.create(), callback);
+    }
+
     public Future<AsyncClientEndpoint> lease(
             final HttpHost host,
             final HttpContext context,
@@ -253,7 +282,14 @@ class MinimalHttpAsyncClient extends Abs
         @Override
         public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
             Asserts.check(!released.get(), "Endpoint has already been released");
-            connectionEndpoint.execute(exchangeHandler, context);
+
+            final String exchangeId = Long.toHexString(ExecSupport.getNextExecNumber());
+            if (log.isDebugEnabled()) {
+                log.debug(ConnPoolSupport.getId(connectionEndpoint) + ": executing message exchange " + exchangeId);
+            }
+            connectionEndpoint.execute(
+                    log.isDebugEnabled() ? new LoggingAsyncClientExchangeHandler(log, exchangeId, exchangeHandler) : exchangeHandler,
+                    context);
         }
 
         @Override

Copied: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/NoopHttpProcessor.java (from r1793319, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/SimpleResponseConsumer.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/NoopHttpProcessor.java?p2=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/NoopHttpProcessor.java&p1=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/SimpleResponseConsumer.java&r1=1793319&r2=1793320&rev=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/SimpleResponseConsumer.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/NoopHttpProcessor.java Mon May  1 12:39:16 2017
@@ -24,21 +24,30 @@
  * <http://www.apache.org/>.
  *
  */
-package org.apache.hc.client5.http.async.methods;
 
-import org.apache.hc.core5.http.ContentType;
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
 
-public final class SimpleResponseConsumer extends AbstractAsyncResponseConsumer<SimpleHttpResponse, String> {
+final class NoopHttpProcessor implements HttpProcessor {
 
-    public SimpleResponseConsumer() {
-        super(new StringAsyncEntityConsumer());
+    static final NoopHttpProcessor INSTANCE = new NoopHttpProcessor();
+
+    @Override
+    public void process(
+            final HttpRequest request, final EntityDetails entity, final HttpContext context) throws HttpException, IOException {
     }
 
     @Override
-    protected SimpleHttpResponse buildResult(final HttpResponse response, final String entity, final ContentType contentType) {
-        return new SimpleHttpResponse(response, entity, contentType);
+    public void process(
+            final HttpResponse response, final EntityDetails entity, final HttpContext context) throws HttpException, IOException {
     }
 
-}
\ No newline at end of file
+}



Mime
View raw message