hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1784909 - in /httpcomponents/httpclient/trunk/httpclient5/src: examples/org/apache/hc/client5/http/examples/ main/java/org/apache/hc/client5/http/async/ main/java/org/apache/hc/client5/http/async/methods/ main/java/org/apache/hc/client5/ht...
Date Wed, 01 Mar 2017 10:47:23 GMT
Author: olegk
Date: Wed Mar  1 10:47:22 2017
New Revision: 1784909

URL: http://svn.apache.org/viewvc?rev=1784909&view=rev
Log:
Client side APIs for HTTP/2 server push handling

Added:
    httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2ServerPush.java   (with props)
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinDataConsumer.java
      - copied, changed from r1784141, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinPushConsumer.java
      - copied, changed from r1784141, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharDataConsumer.java
      - copied, changed from r1784141, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharPushConsumer.java
      - copied, changed from r1784141, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncPushConsumerRegistry.java   (with props)
Modified:
    httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2Multiplexing.java
    httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttpExchangeStreaming.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractHttpAsyncClientBase.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
    httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java

Modified: httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2Multiplexing.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2Multiplexing.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2Multiplexing.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2Multiplexing.java Wed Mar  1 10:47:22 2017
@@ -40,6 +40,7 @@ import org.apache.hc.client5.http.impl.a
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http2.config.H2Config;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 
 /**
@@ -55,9 +56,14 @@ public class AsyncClientHttp2Multiplexin
                 .setSoTimeout(5000)
                 .build();
 
+        H2Config h2Config = H2Config.custom()
+                .setPushEnabled(false)
+                .build();
+
         CloseableHttpAsyncClient client = HttpAsyncClients.custom()
-                .setProtocolVersion(HttpVersion.HTTP_1_1)
                 .setIOReactorConfig(ioReactorConfig)
+                .setProtocolVersion(HttpVersion.HTTP_2)
+                .setH2Config(h2Config)
                 .build();
 
         client.start();

Added: httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2ServerPush.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2ServerPush.java?rev=1784909&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2ServerPush.java (added)
+++ httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2ServerPush.java Wed Mar  1 10:47:22 2017
@@ -0,0 +1,160 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.examples;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.client5.http.async.methods.AbstractBinPushConsumer;
+import org.apache.hc.client5.http.async.methods.AbstractCharResponseConsumer;
+import org.apache.hc.client5.http.async.methods.AsyncRequestBuilder;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+
+/**
+ * This example demonstrates handling of HTTP/2 message exchanges pushed by the server.
+ */
+public class AsyncClientHttp2ServerPush {
+
+    public static void main(String[] args) throws Exception {
+
+        IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+                .setConnectTimeout(5000)
+                .setSoTimeout(5000)
+                .build();
+
+        H2Config h2Config = H2Config.custom()
+                .setPushEnabled(true)
+                .build();
+
+        CloseableHttpAsyncClient client = HttpAsyncClients.custom()
+                .setIOReactorConfig(ioReactorConfig)
+                .setProtocolVersion(HttpVersion.HTTP_2)
+                .setH2Config(h2Config)
+                .build();
+
+        client.start();
+
+        client.register("*", new Supplier<AsyncPushConsumer>() {
+
+            @Override
+            public AsyncPushConsumer get() {
+                return new AbstractBinPushConsumer() {
+
+                    @Override
+                    protected void start(
+                            final HttpRequest promise,
+                            final HttpResponse response,
+                            final ContentType contentType) throws HttpException, IOException {
+                        System.out.println(promise.getPath() + " (push)->" + new StatusLine(response));
+                    }
+
+                    @Override
+                    protected int capacity() {
+                        return Integer.MAX_VALUE;
+                    }
+
+                    @Override
+                    protected void data(final ByteBuffer data, final boolean endOfStream) throws IOException {
+                    }
+
+                    @Override
+                    protected void completed() {
+                    }
+
+                    @Override
+                    public void failed(final Exception cause) {
+                        System.out.println("(push)->" + cause);
+                    }
+
+                    @Override
+                    public void releaseResources() {
+                    }
+
+                };
+            }
+
+        });
+
+        final HttpHost target = new HttpHost("http2bin.org");
+        final String requestURI = "/";
+        Future<Void> future = client.execute(
+                AsyncRequestBuilder.get(target, requestURI).build(),
+                new AbstractCharResponseConsumer<Void>() {
+
+                    @Override
+                    protected void start(
+                            final HttpResponse response,
+                            final ContentType contentType) throws HttpException, IOException {
+                        System.out.println(requestURI + "->" + new StatusLine(response));
+                    }
+
+                    @Override
+                    protected int capacity() {
+                        return Integer.MAX_VALUE;
+                    }
+
+                    @Override
+                    protected void data(final CharBuffer data, final boolean endOfStream) throws IOException {
+                    }
+
+                    @Override
+                    protected Void buildResult() throws IOException {
+                        return null;
+                    }
+
+                    @Override
+                    public void failed(final Exception cause) {
+                        System.out.println(requestURI + "->" + cause);
+                    }
+
+                    @Override
+                    public void releaseResources() {
+                    }
+
+                }, null);
+        future.get();
+
+        System.out.println("Shutting down");
+        client.shutdown(5, TimeUnit.SECONDS);
+    }
+
+}

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2ServerPush.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2ServerPush.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttp2ServerPush.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttpExchangeStreaming.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttpExchangeStreaming.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttpExchangeStreaming.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/examples/org/apache/hc/client5/http/examples/AsyncClientHttpExchangeStreaming.java Wed Mar  1 10:47:22 2017
@@ -91,7 +91,7 @@ public class AsyncClientHttpExchangeStre
                         }
 
                         @Override
-                        protected Void getResult() {
+                        protected Void buildResult() throws IOException {
                             return null;
                         }
 
@@ -100,6 +100,10 @@ public class AsyncClientHttpExchangeStre
                             System.out.println(requestUri + "->" + cause);
                         }
 
+                        @Override
+                        public void releaseResources() {
+                        }
+
                     }, null);
             future.get();
         }

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/HttpAsyncClient.java Wed Mar  1 10:47:22 2017
@@ -29,7 +29,9 @@ package org.apache.hc.client5.http.async
 import java.util.concurrent.Future;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
 import org.apache.hc.core5.http.protocol.HttpContext;
@@ -68,7 +70,7 @@ public interface HttpAsyncClient {
      *
      * @param <T> the result type of request execution.
      * @param requestProducer request producer callback.
-     * @param responseConsumer response consumer callaback.
+     * @param responseConsumer response consumer callback.
      * @param context HTTP context
      * @param callback future callback.
      * @return future representing pending completion of the operation.
@@ -79,4 +81,15 @@ public interface HttpAsyncClient {
             HttpContext context,
             FutureCallback<T> callback);
 
+    /**
+     * Registers {@link AsyncPushConsumer} for the given host and the URI pattern.
+     *
+     * @param hostname the name of the host this consumer intended for.
+     *                 Can be {@code null} if applies to all hosts
+     * @param uriPattern URI request pattern
+     * @param supplier supplier that will be used to supply a consumer instance
+     *                 for the given combination of hostname and URI pattern.
+     */
+    void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier);
+
 }

Copied: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinDataConsumer.java (from r1784141, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinDataConsumer.java?p2=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinDataConsumer.java&p1=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java&r1=1784141&r2=1784909&rev=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinDataConsumer.java Wed Mar  1 10:47:22 2017
@@ -27,53 +27,23 @@
 package org.apache.hc.client5.http.async.methods;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.nio.charset.UnsupportedCharsetException;
 import java.util.List;
 
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
 import org.apache.hc.core5.http.nio.CapacityChannel;
 
-public abstract class AbstractBinResponseConsumer<T> implements AsyncResponseConsumer<T> {
+public abstract class AbstractBinDataConsumer implements AsyncDataConsumer {
 
     private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
 
-    private volatile FutureCallback<T> resultCallback;
-
-    protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
-
     protected abstract int capacity();
 
     protected abstract void data(ByteBuffer data, boolean endOfStream) throws IOException;
 
-    protected abstract T getResult();
-
-    @Override
-    public final void consumeResponse(
-            final HttpResponse response,
-            final EntityDetails entityDetails,
-            final FutureCallback<T> resultCallback) throws HttpException, IOException {
-        this.resultCallback = resultCallback;
-        if (entityDetails != null) {
-            try {
-                final ContentType contentType = ContentType.parse(entityDetails.getContentType());
-                start(response, contentType);
-            } catch (UnsupportedCharsetException ex) {
-                throw new UnsupportedEncodingException(ex.getMessage());
-            }
-        } else {
-            start(response, null);
-            resultCallback.completed(getResult());
-        }
-
-    }
+    protected abstract void completed();
 
     @Override
     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
@@ -89,15 +59,7 @@ public abstract class AbstractBinRespons
     @Override
     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
         data(EMPTY, true);
-        resultCallback.completed(getResult());
-    }
-
-    @Override
-    public void failed(final Exception cause) {
-    }
-
-    @Override
-    public void releaseResources() {
+        completed();
     }
 
 }
\ No newline at end of file

Copied: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinPushConsumer.java (from r1784141, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinPushConsumer.java?p2=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinPushConsumer.java&p1=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java&r1=1784141&r2=1784909&rev=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinPushConsumer.java Wed Mar  1 10:47:22 2017
@@ -28,76 +28,40 @@ package org.apache.hc.client5.http.async
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
 import java.nio.charset.UnsupportedCharsetException;
-import java.util.List;
 
-import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 
-public abstract class AbstractBinResponseConsumer<T> implements AsyncResponseConsumer<T> {
+public abstract class AbstractBinPushConsumer extends AbstractBinDataConsumer implements AsyncPushConsumer {
 
-    private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
-
-    private volatile FutureCallback<T> resultCallback;
-
-    protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
-
-    protected abstract int capacity();
-
-    protected abstract void data(ByteBuffer data, boolean endOfStream) throws IOException;
-
-    protected abstract T getResult();
+    protected abstract void start(HttpRequest promise, HttpResponse response, ContentType contentType) throws HttpException, IOException;
 
     @Override
-    public final void consumeResponse(
+    public final void consumePromise(
+            final HttpRequest promise,
             final HttpResponse response,
-            final EntityDetails entityDetails,
-            final FutureCallback<T> resultCallback) throws HttpException, IOException {
-        this.resultCallback = resultCallback;
+            final EntityDetails entityDetails) throws HttpException, IOException {
         if (entityDetails != null) {
+            final ContentType contentType;
             try {
-                final ContentType contentType = ContentType.parse(entityDetails.getContentType());
-                start(response, contentType);
+                contentType = ContentType.parse(entityDetails.getContentType());
             } catch (UnsupportedCharsetException ex) {
                 throw new UnsupportedEncodingException(ex.getMessage());
             }
+            start(promise, response, contentType);
         } else {
-            start(response, null);
-            resultCallback.completed(getResult());
+            start(promise, response, null);
+            completed();
         }
-
-    }
-
-    @Override
-    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        capacityChannel.update(capacity());
-    }
-
-    @Override
-    public final int consume(final ByteBuffer src) throws IOException {
-        data(src, false);
-        return capacity();
-    }
-
-    @Override
-    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        data(EMPTY, true);
-        resultCallback.completed(getResult());
     }
 
     @Override
     public void failed(final Exception cause) {
     }
 
-    @Override
-    public void releaseResources() {
-    }
-
 }
\ No newline at end of file

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java Wed Mar  1 10:47:22 2017
@@ -28,32 +28,22 @@ package org.apache.hc.client5.http.async
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
 import java.nio.charset.UnsupportedCharsetException;
-import java.util.List;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
 
-public abstract class AbstractBinResponseConsumer<T> implements AsyncResponseConsumer<T> {
-
-    private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
+public abstract class AbstractBinResponseConsumer<T> extends AbstractBinDataConsumer implements AsyncResponseConsumer<T> {
 
     private volatile FutureCallback<T> resultCallback;
 
     protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
 
-    protected abstract int capacity();
-
-    protected abstract void data(ByteBuffer data, boolean endOfStream) throws IOException;
-
-    protected abstract T getResult();
+    protected abstract T buildResult();
 
     @Override
     public final void consumeResponse(
@@ -70,34 +60,18 @@ public abstract class AbstractBinRespons
             }
         } else {
             start(response, null);
-            resultCallback.completed(getResult());
+            completed();
         }
 
     }
 
     @Override
-    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        capacityChannel.update(capacity());
-    }
-
-    @Override
-    public final int consume(final ByteBuffer src) throws IOException {
-        data(src, false);
-        return capacity();
-    }
-
-    @Override
-    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        data(EMPTY, true);
-        resultCallback.completed(getResult());
+    protected final void completed() {
+        resultCallback.completed(buildResult());
     }
 
     @Override
     public void failed(final Exception cause) {
     }
 
-    @Override
-    public void releaseResources() {
-    }
-
 }
\ No newline at end of file

Copied: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharDataConsumer.java (from r1784141, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharDataConsumer.java?p2=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharDataConsumer.java&p1=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java&r1=1784141&r2=1784909&rev=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharDataConsumer.java Wed Mar  1 10:47:22 2017
@@ -27,67 +27,34 @@
 package org.apache.hc.client5.http.async.methods;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
-import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CoderResult;
-import java.nio.charset.StandardCharsets;
-import java.nio.charset.UnsupportedCharsetException;
 import java.util.List;
 
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
 import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.util.Asserts;
 
-public abstract class AbstractCharResponseConsumer<T> implements AsyncResponseConsumer<T> {
+public abstract class AbstractCharDataConsumer implements AsyncDataConsumer {
 
     private static final ByteBuffer EMPTY_BIN = ByteBuffer.wrap(new byte[0]);
-    private static final CharBuffer EMPTY_CHAR = CharBuffer.wrap(new char[0]);
 
     private final CharBuffer charbuf = CharBuffer.allocate(8192);
 
     private volatile CharsetDecoder charsetDecoder;
-    private volatile FutureCallback<T> resultCallback;
-
-    protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
 
     protected abstract int capacity();
 
     protected abstract void data(CharBuffer data, boolean endOfStream) throws IOException;
 
-    protected abstract T getResult();
+    protected abstract void completed() throws IOException;
 
-    @Override
-    public final void consumeResponse(
-            final HttpResponse response,
-            final EntityDetails entityDetails,
-            final FutureCallback<T> resultCallback) throws HttpException, IOException {
-        this.resultCallback = resultCallback;
-        if (entityDetails != null) {
-            ContentType contentType;
-            try {
-                contentType = ContentType.parse(entityDetails.getContentType());
-            } catch (UnsupportedCharsetException ex) {
-                throw new UnsupportedEncodingException(ex.getMessage());
-            }
-            Charset charset = contentType != null ? contentType.getCharset() : null;
-            if (charset == null) {
-                charset = StandardCharsets.US_ASCII;
-            }
-            charsetDecoder = charset.newDecoder();
-            start(response, contentType);
-        } else {
-            start(response, null);
-            resultCallback.completed(getResult());
-        }
+    protected final void setCharsetDecoder(final CharsetDecoder charsetDecoder) {
+        this.charsetDecoder = charsetDecoder;
     }
 
     @Override
@@ -101,11 +68,11 @@ public abstract class AbstractCharRespon
         }
     }
 
-    private void doDecode() throws IOException {
+    private void doDecode(final boolean endOfStream) throws IOException {
         charbuf.flip();
         final int chunk = charbuf.remaining();
         if (chunk > 0) {
-            data(charbuf, false);
+            data(charbuf, endOfStream);
         }
         charbuf.clear();
     }
@@ -115,7 +82,7 @@ public abstract class AbstractCharRespon
         Asserts.notNull(charsetDecoder, "Charset decoder");
         while (src.hasRemaining()) {
             checkResult(charsetDecoder.decode(src, charbuf, false));
-            doDecode();
+            doDecode(false);
         }
         return capacity();
     }
@@ -124,19 +91,10 @@ public abstract class AbstractCharRespon
     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
         Asserts.notNull(charsetDecoder, "Charset decoder");
         checkResult(charsetDecoder.decode(EMPTY_BIN, charbuf, true));
-        doDecode();
+        doDecode(false);
         checkResult(charsetDecoder.flush(charbuf));
-        doDecode();
-        data(EMPTY_CHAR, true);
-        resultCallback.completed(getResult());
-    }
-
-    @Override
-    public void failed(final Exception cause) {
-    }
-
-    @Override
-    public void releaseResources() {
+        doDecode(true);
+        completed();
     }
 
 }
\ No newline at end of file

Copied: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharPushConsumer.java (from r1784141, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharPushConsumer.java?p2=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharPushConsumer.java&p1=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java&r1=1784141&r2=1784909&rev=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractBinResponseConsumer.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharPushConsumer.java Wed Mar  1 10:47:22 2017
@@ -28,76 +28,47 @@ package org.apache.hc.client5.http.async
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.nio.charset.UnsupportedCharsetException;
-import java.util.List;
 
-import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 
-public abstract class AbstractBinResponseConsumer<T> implements AsyncResponseConsumer<T> {
+public abstract class AbstractCharPushConsumer extends AbstractCharDataConsumer implements AsyncPushConsumer {
 
-    private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
-
-    private volatile FutureCallback<T> resultCallback;
-
-    protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
-
-    protected abstract int capacity();
-
-    protected abstract void data(ByteBuffer data, boolean endOfStream) throws IOException;
-
-    protected abstract T getResult();
+    protected abstract void start(HttpRequest promise, HttpResponse response, ContentType contentType) throws HttpException, IOException;
 
     @Override
-    public final void consumeResponse(
+    public final void consumePromise(
+            final HttpRequest promise,
             final HttpResponse response,
-            final EntityDetails entityDetails,
-            final FutureCallback<T> resultCallback) throws HttpException, IOException {
-        this.resultCallback = resultCallback;
+            final EntityDetails entityDetails) throws HttpException, IOException {
         if (entityDetails != null) {
+            final ContentType contentType;
             try {
-                final ContentType contentType = ContentType.parse(entityDetails.getContentType());
-                start(response, contentType);
+                contentType = ContentType.parse(entityDetails.getContentType());
             } catch (UnsupportedCharsetException ex) {
                 throw new UnsupportedEncodingException(ex.getMessage());
             }
+            Charset charset = contentType != null ? contentType.getCharset() : null;
+            if (charset == null) {
+                charset = StandardCharsets.US_ASCII;
+            }
+            setCharsetDecoder(charset.newDecoder());
+            start(promise, response, contentType);
         } else {
-            start(response, null);
-            resultCallback.completed(getResult());
+            start(promise, response, null);
+            completed();
         }
-
-    }
-
-    @Override
-    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        capacityChannel.update(capacity());
-    }
-
-    @Override
-    public final int consume(final ByteBuffer src) throws IOException {
-        data(src, false);
-        return capacity();
-    }
-
-    @Override
-    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        data(EMPTY, true);
-        resultCallback.completed(getResult());
     }
 
     @Override
     public void failed(final Exception cause) {
     }
 
-    @Override
-    public void releaseResources() {
-    }
-
 }
\ No newline at end of file

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/AbstractCharResponseConsumer.java Wed Mar  1 10:47:22 2017
@@ -28,42 +28,24 @@ package org.apache.hc.client5.http.async
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
 import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CoderResult;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.UnsupportedCharsetException;
-import java.util.List;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.util.Asserts;
 
-public abstract class AbstractCharResponseConsumer<T> implements AsyncResponseConsumer<T> {
+public abstract class AbstractCharResponseConsumer<T> extends AbstractCharDataConsumer implements AsyncResponseConsumer<T> {
 
-    private static final ByteBuffer EMPTY_BIN = ByteBuffer.wrap(new byte[0]);
-    private static final CharBuffer EMPTY_CHAR = CharBuffer.wrap(new char[0]);
-
-    private final CharBuffer charbuf = CharBuffer.allocate(8192);
-
-    private volatile CharsetDecoder charsetDecoder;
     private volatile FutureCallback<T> resultCallback;
 
     protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
 
-    protected abstract int capacity();
-
-    protected abstract void data(CharBuffer data, boolean endOfStream) throws IOException;
-
-    protected abstract T getResult();
+    protected abstract T buildResult() throws IOException;
 
     @Override
     public final void consumeResponse(
@@ -72,7 +54,7 @@ public abstract class AbstractCharRespon
             final FutureCallback<T> resultCallback) throws HttpException, IOException {
         this.resultCallback = resultCallback;
         if (entityDetails != null) {
-            ContentType contentType;
+            final ContentType contentType;
             try {
                 contentType = ContentType.parse(entityDetails.getContentType());
             } catch (UnsupportedCharsetException ex) {
@@ -82,61 +64,21 @@ public abstract class AbstractCharRespon
             if (charset == null) {
                 charset = StandardCharsets.US_ASCII;
             }
-            charsetDecoder = charset.newDecoder();
+            setCharsetDecoder(charset.newDecoder());
             start(response, contentType);
         } else {
             start(response, null);
-            resultCallback.completed(getResult());
-        }
-    }
-
-    @Override
-    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        capacityChannel.update(capacity());
-    }
-
-    private void checkResult(final CoderResult result) throws IOException {
-        if (result.isError()) {
-            result.throwException();
+            completed();
         }
     }
 
-    private void doDecode() throws IOException {
-        charbuf.flip();
-        final int chunk = charbuf.remaining();
-        if (chunk > 0) {
-            data(charbuf, false);
-        }
-        charbuf.clear();
-    }
-
-    @Override
-    public final int consume(final ByteBuffer src) throws IOException {
-        Asserts.notNull(charsetDecoder, "Charset decoder");
-        while (src.hasRemaining()) {
-            checkResult(charsetDecoder.decode(src, charbuf, false));
-            doDecode();
-        }
-        return capacity();
-    }
-
     @Override
-    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        Asserts.notNull(charsetDecoder, "Charset decoder");
-        checkResult(charsetDecoder.decode(EMPTY_BIN, charbuf, true));
-        doDecode();
-        checkResult(charsetDecoder.flush(charbuf));
-        doDecode();
-        data(EMPTY_CHAR, true);
-        resultCallback.completed(getResult());
+    protected final void completed() throws IOException {
+        resultCallback.completed(buildResult());
     }
 
     @Override
     public void failed(final Exception cause) {
     }
 
-    @Override
-    public void releaseResources() {
-    }
-
 }
\ No newline at end of file

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractHttpAsyncClientBase.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractHttpAsyncClientBase.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractHttpAsyncClientBase.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AbstractHttpAsyncClientBase.java Wed Mar  1 10:47:22 2017
@@ -34,7 +34,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ExceptionListener;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownType;
 import org.apache.hc.core5.reactor.ConnectionInitiator;
@@ -54,6 +56,7 @@ abstract class AbstractHttpAsyncClientBa
 
     final Logger log = LogManager.getLogger(getClass());
 
+    private final AsyncPushConsumerRegistry pushConsumerRegistry;
     private final DefaultConnectingIOReactor ioReactor;
     private final ExceptionListener exceptionListener;
     private final ExecutorService executorService;
@@ -61,6 +64,7 @@ abstract class AbstractHttpAsyncClientBa
 
     public AbstractHttpAsyncClientBase(
             final IOEventHandlerFactory eventHandlerFactory,
+            final AsyncPushConsumerRegistry pushConsumerRegistry,
             final IOReactorConfig reactorConfig,
             final ThreadFactory threadFactory,
             final ThreadFactory workerThreadFactory) throws IOReactorException {
@@ -77,6 +81,7 @@ abstract class AbstractHttpAsyncClientBa
                     }
 
                 });
+        this.pushConsumerRegistry = pushConsumerRegistry;
         this.exceptionListener = new ExceptionListener() {
             @Override
             public void onError(final Exception ex) {
@@ -97,15 +102,18 @@ abstract class AbstractHttpAsyncClientBa
                     try {
                         ioReactor.execute();
                     } catch (Exception ex) {
-                        if (exceptionListener != null) {
-                            exceptionListener.onError(ex);
-                        }
+                        exceptionListener.onError(ex);
                     }
                 }
             });
         }
     }
 
+    @Override
+    public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
+        pushConsumerRegistry.register(hostname, uriPattern, supplier);
+    }
+
     void ensureRunning() {
         switch (status.get()) {
             case READY:

Added: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncPushConsumerRegistry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncPushConsumerRegistry.java?rev=1784909&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncPushConsumerRegistry.java (added)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncPushConsumerRegistry.java Wed Mar  1 10:47:22 2017
@@ -0,0 +1,99 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import java.util.Locale;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.protocol.UriPatternMatcher;
+import org.apache.hc.core5.net.URIAuthority;
+import org.apache.hc.core5.util.Args;
+
+class AsyncPushConsumerRegistry {
+
+    private final UriPatternMatcher<Supplier<AsyncPushConsumer>> primary;
+    private final ConcurrentMap<String, UriPatternMatcher<Supplier<AsyncPushConsumer>>> hostMap;
+
+    public AsyncPushConsumerRegistry() {
+        this.primary = new UriPatternMatcher<>();
+        this.hostMap = new ConcurrentHashMap<>();
+    }
+
+    private UriPatternMatcher<Supplier<AsyncPushConsumer>> getPatternMatcher(final String hostname) {
+        if (hostname == null) {
+            return primary;
+        }
+        final UriPatternMatcher<Supplier<AsyncPushConsumer>> hostMatcher = hostMap.get(hostname);
+        if (hostMatcher != null) {
+            return hostMatcher;
+        }
+        return primary;
+    }
+
+    public AsyncPushConsumer get(final HttpRequest request) throws HttpException {
+        Args.notNull(request, "Request");
+        final URIAuthority authority = request.getAuthority();
+        final String key = authority != null ? authority.getHostName().toLowerCase(Locale.ROOT) : null;
+        final UriPatternMatcher<Supplier<AsyncPushConsumer>> patternMatcher = getPatternMatcher(key);
+        if (patternMatcher == null) {
+            return null;
+        }
+        String path = request.getPath();
+        final int i = path.indexOf("?");
+        if (i != -1) {
+            path = path.substring(0, i);
+        }
+        final Supplier<AsyncPushConsumer> supplier = patternMatcher.lookup(path);
+        return supplier != null ? supplier.get() : null;
+    }
+
+    public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
+        Args.notBlank(uriPattern, "URI pattern");
+        Args.notNull(supplier, "Supplier");
+        if (hostname == null) {
+            primary.register(uriPattern, supplier);
+        } else {
+            final String key = hostname.toLowerCase(Locale.ROOT);
+            UriPatternMatcher<Supplier<AsyncPushConsumer>> matcher = hostMap.get(key);
+            if (matcher == null) {
+                final UriPatternMatcher<Supplier<AsyncPushConsumer>> newMatcher = new UriPatternMatcher<>();
+                matcher = hostMap.putIfAbsent(key, newMatcher);
+                if (matcher == null) {
+                    matcher = newMatcher;
+                }
+            }
+            matcher.register(uriPattern, supplier);
+        }
+    }
+
+}

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncPushConsumerRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncPushConsumerRegistry.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncPushConsumerRegistry.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java Wed Mar  1 10:47:22 2017
@@ -37,7 +37,9 @@ import org.apache.hc.client5.http.protoc
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
 import org.apache.hc.core5.reactor.ExceptionEvent;
@@ -76,4 +78,8 @@ public abstract class CloseableHttpAsync
         return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
     }
 
+    public final void register(final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
+        register(null, uriPattern, supplier);
+    }
+
 }

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java Wed Mar  1 10:47:22 2017
@@ -43,13 +43,13 @@ import org.apache.hc.client5.http.config
 import org.apache.hc.client5.http.impl.DefaultConnectionKeepAliveStrategy;
 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
 import org.apache.hc.client5.http.impl.DefaultThreadFactory;
-import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
 import org.apache.hc.client5.http.impl.DefaultUserTokenHandler;
+import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
+import org.apache.hc.client5.http.impl.NoopUserTokenHandler;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
 import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
 import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner;
 import org.apache.hc.client5.http.impl.routing.SystemDefaultRoutePlanner;
-import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
-import org.apache.hc.client5.http.impl.NoopUserTokenHandler;
 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
 import org.apache.hc.client5.http.protocol.RequestDefaultHeaders;
 import org.apache.hc.client5.http.protocol.RequestExpectContinue;
@@ -57,6 +57,7 @@ import org.apache.hc.client5.http.protoc
 import org.apache.hc.client5.http.routing.HttpRoutePlanner;
 import org.apache.hc.core5.http.ConnectionReuseStrategy;
 import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpRequestInterceptor;
@@ -66,6 +67,8 @@ import org.apache.hc.core5.http.HttpVers
 import org.apache.hc.core5.http.config.ConnectionConfig;
 import org.apache.hc.core5.http.config.H1Config;
 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
 import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
@@ -518,11 +521,19 @@ public class HttpAsyncClientBuilder {
             }
             closeablesCopy.add(connManagerCopy);
         }
+        final AsyncPushConsumerRegistry pushConsumerRegistry = new AsyncPushConsumerRegistry();
         final IOEventHandlerFactory ioEventHandlerFactory;
         if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2)) {
             ioEventHandlerFactory = new DefaultAsyncHttp2ClientEventHandlerFactory(
                     httpProcessor,
-                    null,
+                    new HandlerFactory<AsyncPushConsumer>() {
+
+                        @Override
+                        public AsyncPushConsumer create(final HttpRequest request) throws HttpException {
+                            return pushConsumerRegistry.get(request);
+                        }
+
+                    },
                     StandardCharsets.US_ASCII,
                     h2Config != null ? h2Config : H2Config.DEFAULT);
         } else {
@@ -554,6 +565,7 @@ public class HttpAsyncClientBuilder {
         try {
             return new InternalHttpAsyncClient(
                     ioEventHandlerFactory,
+                    pushConsumerRegistry,
                     ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
                     new DefaultThreadFactory("httpclient-main", true),
                     new DefaultThreadFactory("httpclient-dispatch", true),

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java Wed Mar  1 10:47:22 2017
@@ -32,10 +32,14 @@ import java.nio.charset.StandardCharsets
 import org.apache.hc.client5.http.impl.DefaultThreadFactory;
 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpVersion;
 import org.apache.hc.core5.http.config.ConnectionConfig;
 import org.apache.hc.core5.http.config.H1Config;
 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
 import org.apache.hc.core5.http.protocol.RequestUserAgent;
@@ -117,10 +121,12 @@ public class HttpAsyncClients {
 
     private static MinimalHttpAsyncClient createMinimalImpl(
             final IOEventHandlerFactory eventHandlerFactory,
+            final AsyncPushConsumerRegistry pushConsumerRegistry,
             final AsyncClientConnectionManager connmgr) {
         try {
             return new MinimalHttpAsyncClient(
                     eventHandlerFactory,
+                    pushConsumerRegistry,
                     IOReactorConfig.DEFAULT,
                     new DefaultThreadFactory("httpclient-main", true),
                     new DefaultThreadFactory("httpclient-dispatch", true),
@@ -133,22 +139,34 @@ public class HttpAsyncClients {
     private static MinimalHttpAsyncClient createMinimalImpl(
             final H1Config h1Config,
             final AsyncClientConnectionManager connmgr) {
-        return createMinimalImpl(new DefaultAsyncHttp1ClientEventHandlerFactory(
+        return createMinimalImpl(
+                new DefaultAsyncHttp1ClientEventHandlerFactory(
                         createMinimalProtocolProcessor(),
                         h1Config,
                         ConnectionConfig.DEFAULT,
                         DefaultConnectionReuseStrategy.INSTANCE),
+                new AsyncPushConsumerRegistry(),
                 connmgr);
     }
 
     private static MinimalHttpAsyncClient createMinimalImpl(
             final H2Config h2Config,
             final AsyncClientConnectionManager connmgr) {
-        return createMinimalImpl(new DefaultAsyncHttp2ClientEventHandlerFactory(
+        final AsyncPushConsumerRegistry pushConsumerRegistry = new AsyncPushConsumerRegistry();
+        return createMinimalImpl(
+                new DefaultAsyncHttp2ClientEventHandlerFactory(
                         createMinimalProtocolProcessor(),
-                        null,
+                        new HandlerFactory<AsyncPushConsumer>() {
+
+                            @Override
+                            public AsyncPushConsumer create(final HttpRequest request) throws HttpException {
+                                return pushConsumerRegistry.get(request);
+                            }
+
+                        },
                         StandardCharsets.US_ASCII,
                         h2Config),
+                pushConsumerRegistry,
                 connmgr);
     }
 

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java Wed Mar  1 10:47:22 2017
@@ -82,8 +82,9 @@ class InternalHttpAsyncClient extends Ab
     private final RequestConfig defaultConfig;
     private final List<Closeable> closeables;
 
-    public InternalHttpAsyncClient(
+    InternalHttpAsyncClient(
             final IOEventHandlerFactory eventHandlerFactory,
+            final AsyncPushConsumerRegistry pushConsumerRegistry,
             final IOReactorConfig reactorConfig,
             final ThreadFactory threadFactory,
             final ThreadFactory workerThreadFactory,
@@ -93,7 +94,7 @@ class InternalHttpAsyncClient extends Ab
             final UserTokenHandler userTokenHandler,
             final RequestConfig defaultConfig,
             final List<Closeable> closeables) throws IOReactorException {
-        super(eventHandlerFactory, reactorConfig, threadFactory, workerThreadFactory);
+        super(eventHandlerFactory, pushConsumerRegistry, reactorConfig, threadFactory, workerThreadFactory);
         this.connmgr = connmgr;
         this.routePlanner = routePlanner;
         this.keepAliveStrategy = keepAliveStrategy;

Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java?rev=1784909&r1=1784908&r2=1784909&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java Wed Mar  1 10:47:22 2017
@@ -61,11 +61,12 @@ class MinimalHttpAsyncClient extends Abs
 
     public MinimalHttpAsyncClient(
             final IOEventHandlerFactory eventHandlerFactory,
+            final AsyncPushConsumerRegistry pushConsumerRegistry,
             final IOReactorConfig reactorConfig,
             final ThreadFactory threadFactory,
             final ThreadFactory workerThreadFactory,
             final AsyncClientConnectionManager connmgr) throws IOReactorException {
-        super(eventHandlerFactory, reactorConfig, threadFactory, workerThreadFactory);
+        super(eventHandlerFactory, pushConsumerRegistry, reactorConfig, threadFactory, workerThreadFactory);
         this.connmgr = connmgr;
     }
 



Mime
View raw message