hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject [2/6] httpcomponents-core git commit: Protocol handling API refactoring (no functional changes, mostly moving code to different packages)
Date Tue, 01 Aug 2017 19:07:00 GMT
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityConsumer.java
deleted file mode 100644
index 6640697..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityConsumer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http.nio.entity.ContentInputStream;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-public abstract class AbstractClassicEntityConsumer<T> implements AsyncEntityConsumer<T> {
-
-    private enum State { IDLE, ACTIVE, COMPLETED }
-
-    private final Executor executor;
-    private final SharedInputBuffer buffer;
-    private final AtomicReference<State> state;
-    private final AtomicReference<T> resultRef;
-    private final AtomicReference<Exception> exceptionRef;
-
-    public AbstractClassicEntityConsumer(final int initialBufferSize, final Executor executor) {
-        this.executor = Args.notNull(executor, "Executor");
-        this.buffer = new SharedInputBuffer(initialBufferSize);
-        this.state = new AtomicReference<>(State.IDLE);
-        this.resultRef = new AtomicReference<>(null);
-        this.exceptionRef = new AtomicReference<>(null);
-    }
-
-    protected abstract T consumeData(ContentType contentType, InputStream inputStream) throws IOException;
-
-    @Override
-    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        buffer.updateCapacity(capacityChannel);
-    }
-
-    @Override
-    public final void streamStart(final EntityDetails entityDetails, final FutureCallback<T> resultCallback) throws HttpException, IOException {
-        final ContentType contentType;
-        try {
-            contentType = ContentType.parse(entityDetails.getContentType());
-        } catch (final UnsupportedCharsetException ex) {
-            throw new UnsupportedEncodingException(ex.getMessage());
-        }
-        if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
-            executor.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        final T result = consumeData(contentType, new ContentInputStream(buffer));
-                        resultRef.set(result);
-                        resultCallback.completed(result);
-                    } catch (final Exception ex) {
-                        buffer.abort();
-                        resultCallback.failed(ex);
-                    } finally {
-                        state.set(State.COMPLETED);
-                    }
-                }
-
-            });
-        }
-    }
-
-    @Override
-    public final int consume(final ByteBuffer src) throws IOException {
-        return buffer.fill(src);
-    }
-
-    @Override
-    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        buffer.markEndStream();
-    }
-
-    @Override
-    public final void failed(final Exception cause) {
-        if (exceptionRef.compareAndSet(null, cause)) {
-            releaseResources();
-        }
-    }
-
-    public final Exception getException() {
-        return exceptionRef.get();
-    }
-
-    @Override
-    public final T getContent() {
-        return resultRef.get();
-    }
-
-    @Override
-    public void releaseResources() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityProducer.java
deleted file mode 100644
index 40768b1..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityProducer.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.nio.AsyncEntityProducer;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http.nio.entity.ContentOutputStream;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-public abstract class AbstractClassicEntityProducer implements AsyncEntityProducer {
-
-    private enum State { IDLE, ACTIVE, COMPLETED }
-
-    private final SharedOutputBuffer buffer;
-    private final ContentType contentType;
-    private final Executor executor;
-    private final AtomicReference<State> state;
-    private final AtomicReference<Exception> exception;
-
-    public AbstractClassicEntityProducer(final int initialBufferSize, final ContentType contentType, final Executor executor) {
-        this.buffer = new SharedOutputBuffer(initialBufferSize);
-        this.contentType = contentType;
-        this.executor = Args.notNull(executor, "Executor");
-        this.state = new AtomicReference<>(State.IDLE);
-        this.exception = new AtomicReference<>(null);
-    }
-
-    protected abstract void produceData(ContentType contentType, OutputStream outputStream) throws IOException;
-
-    @Override
-    public final int available() {
-        return buffer.length();
-    }
-
-    @Override
-    public final void produce(final DataStreamChannel channel) throws IOException {
-        if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
-            executor.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        produceData(contentType, new ContentOutputStream(buffer));
-                        buffer.writeCompleted();
-                    } catch (final Exception ex) {
-                        buffer.abort();
-                    } finally {
-                        state.set(State.COMPLETED);
-                    }
-                }
-
-            });
-        }
-        buffer.flush(channel);
-    }
-
-    @Override
-    public final long getContentLength() {
-        return -1;
-    }
-
-    @Override
-    public final String getContentType() {
-        return contentType != null ? contentType.toString() : null;
-    }
-
-    @Override
-    public String getContentEncoding() {
-        return null;
-    }
-
-    @Override
-    public final boolean isChunked() {
-        return false;
-    }
-
-    @Override
-    public final Set<String> getTrailerNames() {
-        return null;
-    }
-
-    @Override
-    public final void failed(final Exception cause) {
-        if (exception.compareAndSet(null, cause)) {
-            releaseResources();
-        }
-    }
-
-    public final Exception getException() {
-        return exception.get();
-    }
-
-    @Override
-    public void releaseResources() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractSharedBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractSharedBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractSharedBuffer.java
deleted file mode 100644
index 70fb961..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractSharedBuffer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-abstract class AbstractSharedBuffer extends ExpandableBuffer {
-
-    final ReentrantLock lock;
-    final Condition condition;
-
-    volatile boolean endStream;
-    volatile boolean aborted;
-
-    public AbstractSharedBuffer(final ReentrantLock lock, final int initialBufferSize) {
-        super(initialBufferSize);
-        this.lock = Args.notNull(lock, "Lock");
-        this.condition = lock.newCondition();
-    }
-
-    @Override
-    public boolean hasData() {
-        lock.lock();
-        try {
-            return super.hasData();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public int capacity() {
-        lock.lock();
-        try {
-            return super.capacity();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public int length() {
-        lock.lock();
-        try {
-            return super.length();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void abort() {
-        lock.lock();
-        try {
-            endStream = true;
-            aborted = true;
-            condition.signalAll();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void reset() {
-        if (aborted) {
-            return;
-        }
-        lock.lock();
-        try {
-            setInputMode();
-            buffer().clear();
-            endStream = false;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public boolean isEndStream() {
-        lock.lock();
-        try {
-            return endStream && !super.hasData();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedInputBuffer.java
deleted file mode 100644
index 302982a..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedInputBuffer.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http.nio.entity.ContentInputBuffer;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
-
-    private volatile CapacityChannel capacityChannel;
-
-    public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
-        super(lock, initialBufferSize);
-    }
-
-    public SharedInputBuffer(final int buffersize) {
-        super(new ReentrantLock(), buffersize);
-    }
-
-    public int fill(final ByteBuffer src) throws IOException {
-        lock.lock();
-        try {
-            setInputMode();
-            ensureCapacity(buffer().position() + src.remaining());
-            buffer().put(src);
-            final int remaining = buffer().remaining();
-            condition.signalAll();
-            return remaining;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        lock.lock();
-        try {
-            this.capacityChannel = capacityChannel;
-            setInputMode();
-            if (buffer().hasRemaining()) {
-                capacityChannel.update(buffer().remaining());
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void awaitInput() throws InterruptedIOException {
-        if (!buffer().hasRemaining()) {
-            setInputMode();
-            while (buffer().position() == 0 && !endStream && !aborted) {
-                try {
-                    condition.await();
-                } catch (final InterruptedException ex) {
-                    Thread.currentThread().interrupt();
-                    throw new InterruptedIOException(ex.getMessage());
-                }
-            }
-            setOutputMode();
-        }
-    }
-
-    @Override
-    public int read() throws IOException {
-        lock.lock();
-        try {
-            setOutputMode();
-            awaitInput();
-            if (aborted) {
-                return -1;
-            }
-            if (!buffer().hasRemaining() && endStream) {
-                return -1;
-            }
-            final int b = buffer().get() & 0xff;
-            if (!buffer().hasRemaining() && capacityChannel != null) {
-                setInputMode();
-                if (buffer().hasRemaining()) {
-                    capacityChannel.update(buffer().remaining());
-                }
-            }
-            return b;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public int read(final byte[] b, final int off, final int len) throws IOException {
-        lock.lock();
-        try {
-            setOutputMode();
-            awaitInput();
-            if (aborted) {
-                return -1;
-            }
-            if (!buffer().hasRemaining() && endStream) {
-                return -1;
-            }
-            final int chunk = Math.min(buffer().remaining(), len);
-            buffer().get(b, off, chunk);
-            if (!buffer().hasRemaining() && capacityChannel != null) {
-                setInputMode();
-                if (buffer().hasRemaining()) {
-                    capacityChannel.update(buffer().remaining());
-                }
-            }
-            return chunk;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void markEndStream() throws IOException {
-        if (endStream) {
-            return;
-        }
-        lock.lock();
-        try {
-            if (!endStream) {
-                endStream = true;
-                capacityChannel = null;
-                condition.signalAll();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedOutputBuffer.java
deleted file mode 100644
index 8d63def..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedOutputBuffer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.nio.entity.ContentOutputBuffer;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
-
-    private volatile DataStreamChannel dataStreamChannel;
-    private volatile boolean hasCapacity;
-
-    public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
-        super(lock, initialBufferSize);
-        this.hasCapacity = false;
-    }
-
-    public SharedOutputBuffer(final int buffersize) {
-        this(new ReentrantLock(), buffersize);
-    }
-
-    public void flush(final DataStreamChannel channel) throws IOException {
-        lock.lock();
-        try {
-            dataStreamChannel = channel;
-            hasCapacity = true;
-            setOutputMode();
-            if (buffer().hasRemaining()) {
-                dataStreamChannel.write(buffer());
-            }
-            if (!buffer().hasRemaining() && endStream) {
-                dataStreamChannel.endStream();
-            }
-            condition.signalAll();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void ensureNotAborted() throws InterruptedIOException {
-        if (aborted) {
-            throw new InterruptedIOException("Operation aborted");
-        }
-    }
-
-    @Override
-    public void write(final byte[] b, final int off, final int len) throws IOException {
-        final ByteBuffer src = ByteBuffer.wrap(b, off, len);
-        lock.lock();
-        try {
-            ensureNotAborted();
-            setInputMode();
-            while (src.hasRemaining()) {
-                // always buffer small chunks
-                if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
-                    buffer().put(src);
-                } else {
-                    if (buffer().position() > 0 || dataStreamChannel == null) {
-                        waitFlush();
-                    }
-                    if (buffer().position() == 0 && dataStreamChannel != null) {
-                        final int bytesWritten = dataStreamChannel.write(src);
-                        if (bytesWritten == 0) {
-                            hasCapacity = false;
-                            waitFlush();
-                        }
-                    }
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void write(final int b) throws IOException {
-        lock.lock();
-        try {
-            ensureNotAborted();
-            setInputMode();
-            if (!buffer().hasRemaining()) {
-                waitFlush();
-            }
-            buffer().put((byte)b);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void writeCompleted() throws IOException {
-        if (endStream) {
-            return;
-        }
-        lock.lock();
-        try {
-            if (!endStream) {
-                endStream = true;
-                if (dataStreamChannel != null) {
-                    setOutputMode();
-                    if (buffer().hasRemaining()) {
-                        dataStreamChannel.requestOutput();
-                    } else {
-                        dataStreamChannel.endStream();
-                    }
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void waitFlush() throws InterruptedIOException {
-        setOutputMode();
-        if (dataStreamChannel != null) {
-            dataStreamChannel.requestOutput();
-        }
-        ensureNotAborted();
-        while (buffer().hasRemaining() || !hasCapacity) {
-            try {
-                condition.await();
-            } catch (final InterruptedException ex) {
-                Thread.currentThread().interrupt();
-                throw new InterruptedIOException(ex.getMessage());
-            }
-            ensureNotAborted();
-        }
-        setInputMode();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/io/UriHttpRequestHandlerMapper.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/UriHttpRequestHandlerMapper.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/UriHttpRequestHandlerMapper.java
deleted file mode 100644
index 2a5c292..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/io/UriHttpRequestHandlerMapper.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http.io;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.HttpRequest;
-import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.http.protocol.LookupRegistry;
-import org.apache.hc.core5.http.protocol.UriPatternMatcher;
-import org.apache.hc.core5.util.Args;
-
-/**
- * Maintains a map of HTTP request handlers keyed by a request URI pattern.
- * <br>
- * Patterns may have three formats:
- * <ul>
- * <li>{@code *}</li>
- * <li>{@code *<uri>}</li>
- * <li>{@code <uri>*}</li>
- * </ul>
- * <br>
- * This class can be used to map an instance of
- * {@link HttpRequestHandler} matching a particular request URI. Usually the
- * mapped request handler will be used to process the request with the
- * specified request URI.
- *
- * @since 4.3
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-public class UriHttpRequestHandlerMapper implements HttpRequestHandlerMapper {
-
-    private final LookupRegistry<HttpRequestHandler> matcher;
-
-    public UriHttpRequestHandlerMapper(final LookupRegistry<HttpRequestHandler> matcher) {
-        super();
-        this.matcher = Args.notNull(matcher, "Pattern matcher");
-    }
-
-    public UriHttpRequestHandlerMapper() {
-        this(new UriPatternMatcher<HttpRequestHandler>());
-    }
-
-    /**
-     * Registers the given {@link HttpRequestHandler} as a handler for URIs
-     * matching the given pattern.
-     *
-     * @param pattern the pattern to register the handler for.
-     * @param handler the handler.
-     */
-    public void register(final String pattern, final HttpRequestHandler handler) {
-        Args.notNull(pattern, "Pattern");
-        Args.notNull(handler, "Handler");
-        matcher.register(pattern, handler);
-    }
-
-    /**
-     * Removes registered handler, if exists, for the given pattern.
-     *
-     * @param pattern the pattern to unregister the handler for.
-     */
-    public void unregister(final String pattern) {
-        matcher.unregister(pattern);
-    }
-
-    /**
-     * Extracts request path from the given {@link HttpRequest}
-     */
-    protected String getRequestPath(final HttpRequest request) {
-        String uriPath = request.getPath();
-        int index = uriPath.indexOf("?");
-        if (index != -1) {
-            uriPath = uriPath.substring(0, index);
-        } else {
-            index = uriPath.indexOf("#");
-            if (index != -1) {
-                uriPath = uriPath.substring(0, index);
-            }
-        }
-        return uriPath;
-    }
-
-    /**
-     * Looks up a handler matching the given request URI.
-     *
-     * @param request the request
-     * @return handler or {@code null} if no match is found.
-     */
-    @Override
-    public HttpRequestHandler lookup(final HttpRequest request, final HttpContext context) {
-        Args.notNull(request, "HTTP request");
-        return matcher.lookup(getRequestPath(request));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/io/support/UriHttpRequestHandlerMapper.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/support/UriHttpRequestHandlerMapper.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/support/UriHttpRequestHandlerMapper.java
new file mode 100644
index 0000000..ec3d62c
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/support/UriHttpRequestHandlerMapper.java
@@ -0,0 +1,122 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.io.support;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.io.HttpRequestHandler;
+import org.apache.hc.core5.http.io.HttpRequestHandlerMapper;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.LookupRegistry;
+import org.apache.hc.core5.http.protocol.UriPatternMatcher;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Maintains a map of HTTP request handlers keyed by a request URI pattern.
+ * <br>
+ * Patterns may have three formats:
+ * <ul>
+ * <li>{@code *}</li>
+ * <li>{@code *<uri>}</li>
+ * <li>{@code <uri>*}</li>
+ * </ul>
+ * <br>
+ * This class can be used to map an instance of
+ * {@link HttpRequestHandler} matching a particular request URI. Usually the
+ * mapped request handler will be used to process the request with the
+ * specified request URI.
+ *
+ * @since 4.3
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public class UriHttpRequestHandlerMapper implements HttpRequestHandlerMapper {
+
+    private final LookupRegistry<HttpRequestHandler> matcher;
+
+    public UriHttpRequestHandlerMapper(final LookupRegistry<HttpRequestHandler> matcher) {
+        super();
+        this.matcher = Args.notNull(matcher, "Pattern matcher");
+    }
+
+    public UriHttpRequestHandlerMapper() {
+        this(new UriPatternMatcher<HttpRequestHandler>());
+    }
+
+    /**
+     * Registers the given {@link HttpRequestHandler} as a handler for URIs
+     * matching the given pattern.
+     *
+     * @param pattern the pattern to register the handler for.
+     * @param handler the handler.
+     */
+    public void register(final String pattern, final HttpRequestHandler handler) {
+        Args.notNull(pattern, "Pattern");
+        Args.notNull(handler, "Handler");
+        matcher.register(pattern, handler);
+    }
+
+    /**
+     * Removes registered handler, if exists, for the given pattern.
+     *
+     * @param pattern the pattern to unregister the handler for.
+     */
+    public void unregister(final String pattern) {
+        matcher.unregister(pattern);
+    }
+
+    /**
+     * Extracts request path from the given {@link HttpRequest}
+     */
+    protected String getRequestPath(final HttpRequest request) {
+        String uriPath = request.getPath();
+        int index = uriPath.indexOf("?");
+        if (index != -1) {
+            uriPath = uriPath.substring(0, index);
+        } else {
+            index = uriPath.indexOf("#");
+            if (index != -1) {
+                uriPath = uriPath.substring(0, index);
+            }
+        }
+        return uriPath;
+    }
+
+    /**
+     * Looks up a handler matching the given request URI.
+     *
+     * @param request the request
+     * @return handler or {@code null} if no match is found.
+     */
+    @Override
+    public HttpRequestHandler lookup(final HttpRequest request, final HttpContext context) {
+        Args.notNull(request, "HTTP request");
+        return matcher.lookup(getRequestPath(request));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java
index ec07515..19f875e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java
@@ -38,7 +38,7 @@ import org.apache.hc.core5.http.HttpResponse;
  *
  * @since 5.0
  */
-public interface AsyncClientExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
+public interface AsyncClientExchangeHandler extends AsyncDataExchangeHandler {
 
     void produceRequest(RequestChannel channel) throws HttpException, IOException;
 
@@ -46,8 +46,6 @@ public interface AsyncClientExchangeHandler extends AsyncDataConsumer, AsyncData
 
     void consumeInformation(HttpResponse response) throws HttpException, IOException;
 
-    void failed(Exception cause);
-
     void cancel();
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncDataExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncDataExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncDataExchangeHandler.java
new file mode 100644
index 0000000..3bc09e5
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncDataExchangeHandler.java
@@ -0,0 +1,39 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio;
+
+/**
+ * Abstract asynchronous data exchange handler that acts as a data consumer
+ * and a data producer.
+ *
+ * @since 5.0
+ */
+public interface AsyncDataExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
+
+    void failed(Exception cause);
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java
index 1d64bd3..71d0e8f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java
@@ -26,8 +26,9 @@
  */
 package org.apache.hc.core5.http.nio;
 
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.HttpRequest;
+import java.io.IOException;
+
+import org.apache.hc.core5.http.HttpException;
 
 /**
  * Abstract asynchronous request producer.
@@ -36,9 +37,7 @@ import org.apache.hc.core5.http.HttpRequest;
  */
 public interface AsyncRequestProducer extends AsyncDataProducer {
 
-    HttpRequest produceRequest();
-
-    EntityDetails getEntityDetails();
+    void sendRequest(RequestChannel requestChannel) throws HttpException, IOException;
 
     void failed(Exception cause);
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java
index 3b35b35..8796ae1 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java
@@ -26,8 +26,9 @@
  */
 package org.apache.hc.core5.http.nio;
 
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.HttpResponse;
+import java.io.IOException;
+
+import org.apache.hc.core5.http.HttpException;
 
 /**
  * Abstract asynchronous response producer.
@@ -36,9 +37,7 @@ import org.apache.hc.core5.http.HttpResponse;
  */
 public interface AsyncResponseProducer extends AsyncDataProducer {
 
-    HttpResponse produceResponse();
-
-    EntityDetails getEntityDetails();
+    void sendResponse(ResponseChannel channel) throws HttpException, IOException;
 
     void failed(Exception cause);
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java
index 4ec752c..4f0758e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java
@@ -38,13 +38,11 @@ import org.apache.hc.core5.http.HttpRequest;
  *
  * @since 5.0
  */
-public interface AsyncServerExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
+public interface AsyncServerExchangeHandler extends AsyncDataExchangeHandler {
 
     void handleRequest(
             HttpRequest request,
             EntityDetails entityDetails,
             ResponseChannel responseChannel) throws HttpException, IOException;
 
-    void failed(Exception cause);
-
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerRequestHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerRequestHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerRequestHandler.java
new file mode 100644
index 0000000..91ed25a
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerRequestHandler.java
@@ -0,0 +1,47 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.protocol.HttpContext;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public interface AsyncServerRequestHandler<T> {
+
+    AsyncRequestConsumer<T> prepare(HttpRequest request, HttpContext context) throws HttpException;
+
+    void handle(T requestMessage, AsyncServerResponseTrigger responseTrigger, HttpContext context) throws HttpException, IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerResponseTrigger.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerResponseTrigger.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerResponseTrigger.java
new file mode 100644
index 0000000..db2e42a
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerResponseTrigger.java
@@ -0,0 +1,49 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public interface AsyncServerResponseTrigger {
+
+    void sendInformation(HttpResponse response) throws HttpException, IOException;
+
+    void submitResponse(AsyncResponseProducer responseProducer) throws HttpException, IOException;
+
+    void pushPromise(HttpRequest promise, AsyncPushProducer responseProducer) throws HttpException, IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java
index 8e850c6..c07d979 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java
@@ -29,7 +29,7 @@ package org.apache.hc.core5.http.nio;
 import java.io.IOException;
 import java.net.URI;
 
-import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.message.BasicHttpRequest;
@@ -61,13 +61,8 @@ public class BasicRequestProducer implements AsyncRequestProducer {
     }
 
     @Override
-    public HttpRequest produceRequest() {
-        return request;
-    }
-
-    @Override
-    public EntityDetails getEntityDetails() {
-        return dataProducer;
+    public void sendRequest(final RequestChannel requestChannel) throws HttpException, IOException {
+        requestChannel.sendRequest(request, dataProducer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java
index 055cf6b..a5cd1ab 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java
@@ -28,10 +28,10 @@ package org.apache.hc.core5.http.nio;
 
 import java.io.IOException;
 
-import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.HttpStatus;
-import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.message.BasicHttpResponse;
 import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
 import org.apache.hc.core5.util.Args;
@@ -79,13 +79,8 @@ public class BasicResponseProducer implements AsyncResponseProducer {
     }
 
     @Override
-    public HttpResponse produceResponse() {
-        return response;
-    }
-
-    @Override
-    public EntityDetails getEntityDetails() {
-        return dataProducer;
+    public void sendResponse(final ResponseChannel responseChannel) throws HttpException, IOException {
+        responseChannel.sendResponse(response, dataProducer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
new file mode 100644
index 0000000..afd75ab
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
@@ -0,0 +1,134 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicEntityConsumer<T> implements AsyncEntityConsumer<T> {
+
+    private enum State { IDLE, ACTIVE, COMPLETED }
+
+    private final Executor executor;
+    private final SharedInputBuffer buffer;
+    private final AtomicReference<State> state;
+    private final AtomicReference<T> resultRef;
+    private final AtomicReference<Exception> exceptionRef;
+
+    public AbstractClassicEntityConsumer(final int initialBufferSize, final Executor executor) {
+        this.executor = Args.notNull(executor, "Executor");
+        this.buffer = new SharedInputBuffer(initialBufferSize);
+        this.state = new AtomicReference<>(State.IDLE);
+        this.resultRef = new AtomicReference<>(null);
+        this.exceptionRef = new AtomicReference<>(null);
+    }
+
+    protected abstract T consumeData(ContentType contentType, InputStream inputStream) throws IOException;
+
+    @Override
+    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        buffer.updateCapacity(capacityChannel);
+    }
+
+    @Override
+    public final void streamStart(final EntityDetails entityDetails, final FutureCallback<T> resultCallback) throws HttpException, IOException {
+        final ContentType contentType;
+        try {
+            contentType = ContentType.parse(entityDetails.getContentType());
+        } catch (final UnsupportedCharsetException ex) {
+            throw new UnsupportedEncodingException(ex.getMessage());
+        }
+        if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        final T result = consumeData(contentType, new ContentInputStream(buffer));
+                        resultRef.set(result);
+                        resultCallback.completed(result);
+                    } catch (final Exception ex) {
+                        buffer.abort();
+                        resultCallback.failed(ex);
+                    } finally {
+                        state.set(State.COMPLETED);
+                    }
+                }
+
+            });
+        }
+    }
+
+    @Override
+    public final int consume(final ByteBuffer src) throws IOException {
+        return buffer.fill(src);
+    }
+
+    @Override
+    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        buffer.markEndStream();
+    }
+
+    @Override
+    public final void failed(final Exception cause) {
+        if (exceptionRef.compareAndSet(null, cause)) {
+            releaseResources();
+        }
+    }
+
+    public final Exception getException() {
+        return exceptionRef.get();
+    }
+
+    @Override
+    public final T getContent() {
+        return resultRef.get();
+    }
+
+    @Override
+    public void releaseResources() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
new file mode 100644
index 0000000..5a4b19a
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
@@ -0,0 +1,130 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicEntityProducer implements AsyncEntityProducer {
+
+    private enum State { IDLE, ACTIVE, COMPLETED }
+
+    private final SharedOutputBuffer buffer;
+    private final ContentType contentType;
+    private final Executor executor;
+    private final AtomicReference<State> state;
+    private final AtomicReference<Exception> exception;
+
+    public AbstractClassicEntityProducer(final int initialBufferSize, final ContentType contentType, final Executor executor) {
+        this.buffer = new SharedOutputBuffer(initialBufferSize);
+        this.contentType = contentType;
+        this.executor = Args.notNull(executor, "Executor");
+        this.state = new AtomicReference<>(State.IDLE);
+        this.exception = new AtomicReference<>(null);
+    }
+
+    protected abstract void produceData(ContentType contentType, OutputStream outputStream) throws IOException;
+
+    @Override
+    public final int available() {
+        return buffer.length();
+    }
+
+    @Override
+    public final void produce(final DataStreamChannel channel) throws IOException {
+        if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        produceData(contentType, new ContentOutputStream(buffer));
+                        buffer.writeCompleted();
+                    } catch (final Exception ex) {
+                        buffer.abort();
+                    } finally {
+                        state.set(State.COMPLETED);
+                    }
+                }
+
+            });
+        }
+        buffer.flush(channel);
+    }
+
+    @Override
+    public final long getContentLength() {
+        return -1;
+    }
+
+    @Override
+    public final String getContentType() {
+        return contentType != null ? contentType.toString() : null;
+    }
+
+    @Override
+    public String getContentEncoding() {
+        return null;
+    }
+
+    @Override
+    public final boolean isChunked() {
+        return false;
+    }
+
+    @Override
+    public final Set<String> getTrailerNames() {
+        return null;
+    }
+
+    @Override
+    public final void failed(final Exception cause) {
+        if (exception.compareAndSet(null, cause)) {
+            releaseResources();
+        }
+    }
+
+    public final Exception getException() {
+        return exception.get();
+    }
+
+    @Override
+    public void releaseResources() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
new file mode 100644
index 0000000..5339580
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
@@ -0,0 +1,119 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+abstract class AbstractSharedBuffer extends ExpandableBuffer {
+
+    final ReentrantLock lock;
+    final Condition condition;
+
+    volatile boolean endStream;
+    volatile boolean aborted;
+
+    public AbstractSharedBuffer(final ReentrantLock lock, final int initialBufferSize) {
+        super(initialBufferSize);
+        this.lock = Args.notNull(lock, "Lock");
+        this.condition = lock.newCondition();
+    }
+
+    @Override
+    public boolean hasData() {
+        lock.lock();
+        try {
+            return super.hasData();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int capacity() {
+        lock.lock();
+        try {
+            return super.capacity();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int length() {
+        lock.lock();
+        try {
+            return super.length();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void abort() {
+        lock.lock();
+        try {
+            endStream = true;
+            aborted = true;
+            condition.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void reset() {
+        if (aborted) {
+            return;
+        }
+        lock.lock();
+        try {
+            setInputMode();
+            buffer().clear();
+            endStream = false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean isEndStream() {
+        lock.lock();
+        try {
+            return endStream && !super.hasData();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
new file mode 100644
index 0000000..7303f70
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
@@ -0,0 +1,163 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
+
+    private volatile CapacityChannel capacityChannel;
+
+    public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
+        super(lock, initialBufferSize);
+    }
+
+    public SharedInputBuffer(final int buffersize) {
+        super(new ReentrantLock(), buffersize);
+    }
+
+    public int fill(final ByteBuffer src) throws IOException {
+        lock.lock();
+        try {
+            setInputMode();
+            ensureCapacity(buffer().position() + src.remaining());
+            buffer().put(src);
+            final int remaining = buffer().remaining();
+            condition.signalAll();
+            return remaining;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        lock.lock();
+        try {
+            this.capacityChannel = capacityChannel;
+            setInputMode();
+            if (buffer().hasRemaining()) {
+                capacityChannel.update(buffer().remaining());
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void awaitInput() throws InterruptedIOException {
+        if (!buffer().hasRemaining()) {
+            setInputMode();
+            while (buffer().position() == 0 && !endStream && !aborted) {
+                try {
+                    condition.await();
+                } catch (final InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException(ex.getMessage());
+                }
+            }
+            setOutputMode();
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        lock.lock();
+        try {
+            setOutputMode();
+            awaitInput();
+            if (aborted) {
+                return -1;
+            }
+            if (!buffer().hasRemaining() && endStream) {
+                return -1;
+            }
+            final int b = buffer().get() & 0xff;
+            if (!buffer().hasRemaining() && capacityChannel != null) {
+                setInputMode();
+                if (buffer().hasRemaining()) {
+                    capacityChannel.update(buffer().remaining());
+                }
+            }
+            return b;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        lock.lock();
+        try {
+            setOutputMode();
+            awaitInput();
+            if (aborted) {
+                return -1;
+            }
+            if (!buffer().hasRemaining() && endStream) {
+                return -1;
+            }
+            final int chunk = Math.min(buffer().remaining(), len);
+            buffer().get(b, off, chunk);
+            if (!buffer().hasRemaining() && capacityChannel != null) {
+                setInputMode();
+                if (buffer().hasRemaining()) {
+                    capacityChannel.update(buffer().remaining());
+                }
+            }
+            return chunk;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void markEndStream() throws IOException {
+        if (endStream) {
+            return;
+        }
+        lock.lock();
+        try {
+            if (!endStream) {
+                endStream = true;
+                capacityChannel = null;
+                condition.signalAll();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
new file mode 100644
index 0000000..f5da5f7
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
@@ -0,0 +1,165 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
+
+    private volatile DataStreamChannel dataStreamChannel;
+    private volatile boolean hasCapacity;
+
+    public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
+        super(lock, initialBufferSize);
+        this.hasCapacity = false;
+    }
+
+    public SharedOutputBuffer(final int buffersize) {
+        this(new ReentrantLock(), buffersize);
+    }
+
+    public void flush(final DataStreamChannel channel) throws IOException {
+        lock.lock();
+        try {
+            dataStreamChannel = channel;
+            hasCapacity = true;
+            setOutputMode();
+            if (buffer().hasRemaining()) {
+                dataStreamChannel.write(buffer());
+            }
+            if (!buffer().hasRemaining() && endStream) {
+                dataStreamChannel.endStream();
+            }
+            condition.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void ensureNotAborted() throws InterruptedIOException {
+        if (aborted) {
+            throw new InterruptedIOException("Operation aborted");
+        }
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        final ByteBuffer src = ByteBuffer.wrap(b, off, len);
+        lock.lock();
+        try {
+            ensureNotAborted();
+            setInputMode();
+            while (src.hasRemaining()) {
+                // always buffer small chunks
+                if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
+                    buffer().put(src);
+                } else {
+                    if (buffer().position() > 0 || dataStreamChannel == null) {
+                        waitFlush();
+                    }
+                    if (buffer().position() == 0 && dataStreamChannel != null) {
+                        final int bytesWritten = dataStreamChannel.write(src);
+                        if (bytesWritten == 0) {
+                            hasCapacity = false;
+                            waitFlush();
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        lock.lock();
+        try {
+            ensureNotAborted();
+            setInputMode();
+            if (!buffer().hasRemaining()) {
+                waitFlush();
+            }
+            buffer().put((byte)b);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void writeCompleted() throws IOException {
+        if (endStream) {
+            return;
+        }
+        lock.lock();
+        try {
+            if (!endStream) {
+                endStream = true;
+                if (dataStreamChannel != null) {
+                    setOutputMode();
+                    if (buffer().hasRemaining()) {
+                        dataStreamChannel.requestOutput();
+                    } else {
+                        dataStreamChannel.endStream();
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void waitFlush() throws InterruptedIOException {
+        setOutputMode();
+        if (dataStreamChannel != null) {
+            dataStreamChannel.requestOutput();
+        }
+        ensureNotAborted();
+        while (buffer().hasRemaining() || !hasCapacity) {
+            try {
+                condition.await();
+            } catch (final InterruptedException ex) {
+                Thread.currentThread().interrupt();
+                throw new InterruptedIOException(ex.getMessage());
+            }
+            ensureNotAborted();
+        }
+        setInputMode();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
new file mode 100644
index 0000000..75a9101
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
@@ -0,0 +1,302 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.HttpResponseWrapper;
+import org.apache.hc.core5.http.nio.HttpContextAware;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.nio.entity.ContentInputStream;
+import org.apache.hc.core5.http.nio.entity.ContentOutputStream;
+import org.apache.hc.core5.http.nio.entity.SharedInputBuffer;
+import org.apache.hc.core5.http.nio.entity.SharedOutputBuffer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicServerExchangeHandler implements HttpContextAware, AsyncServerExchangeHandler {
+
+    private enum State { IDLE, ACTIVE, COMPLETED }
+
+    private final int initialBufferSize;
+    private final Executor executor;
+    private final AtomicReference<State> state;
+    private final AtomicReference<Exception> exception;
+
+    private volatile HttpContext context;
+    private volatile SharedInputBuffer inputBuffer;
+    private volatile SharedOutputBuffer outputBuffer;
+
+    public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
+        this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
+        this.executor = Args.notNull(executor, "Executor");
+        this.exception = new AtomicReference<>(null);
+        this.state = new AtomicReference<>(State.IDLE);
+    }
+
+    public Exception getException() {
+        return exception.get();
+    }
+
+    @Override
+    public void setContext(final HttpContext context) {
+        this.context = context;
+    }
+
+    protected abstract void handle(
+            HttpRequest request, InputStream requestStream,
+            HttpResponse response, OutputStream responseStream,
+            HttpContext context) throws IOException, HttpException;
+
+    @Override
+    public final void handleRequest(
+            final HttpRequest request,
+            final EntityDetails entityDetails,
+            final ResponseChannel responseChannel) throws HttpException, IOException {
+
+        if (entityDetails != null) {
+            final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
+            if (h != null && "100-continue".equalsIgnoreCase(h.getValue())) {
+                responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE));
+            }
+        }
+        final AtomicBoolean responseCommitted = new AtomicBoolean(false);
+
+        final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+        final HttpResponse responseWrapper = new HttpResponseWrapper(response){
+
+            private void ensureNotCommitted() {
+                Asserts.check(!responseCommitted.get(), "Response already committed");
+            }
+
+            @Override
+            public void addHeader(final String name, final Object value) {
+                ensureNotCommitted();
+                super.addHeader(name, value);
+            }
+
+            @Override
+            public void setHeader(final String name, final Object value) {
+                ensureNotCommitted();
+                super.setHeader(name, value);
+            }
+
+            @Override
+            public void setVersion(final ProtocolVersion version) {
+                ensureNotCommitted();
+                super.setVersion(version);
+            }
+
+            @Override
+            public void setCode(final int code) {
+                ensureNotCommitted();
+                super.setCode(code);
+            }
+
+            @Override
+            public void setReasonPhrase(final String reason) {
+                ensureNotCommitted();
+                super.setReasonPhrase(reason);
+            }
+
+            @Override
+            public void setLocale(final Locale locale) {
+                ensureNotCommitted();
+                super.setLocale(locale);
+            }
+
+        };
+
+        final InputStream inputStream;
+        if (entityDetails != null) {
+            inputBuffer = new SharedInputBuffer(initialBufferSize);
+            inputStream = new ContentInputStream(inputBuffer);
+        } else {
+            inputStream = null;
+        }
+        outputBuffer = new SharedOutputBuffer(initialBufferSize);
+
+        final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
+
+            private void triggerResponse() throws IOException {
+                try {
+                    if (responseCommitted.compareAndSet(false, true)) {
+                        responseChannel.sendResponse(response, new EntityDetails() {
+
+                            @Override
+                            public long getContentLength() {
+                                return -1;
+                            }
+
+                            @Override
+                            public String getContentType() {
+                                final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
+                                return h != null ? h.getValue() : null;
+                            }
+
+                            @Override
+                            public String getContentEncoding() {
+                                final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
+                                return h != null ? h.getValue() : null;
+                            }
+
+                            @Override
+                            public boolean isChunked() {
+                                return false;
+                            }
+
+                            @Override
+                            public Set<String> getTrailerNames() {
+                                return null;
+                            }
+
+                        });
+                    }
+                } catch (final HttpException ex) {
+                    throw new IOException(ex.getMessage(), ex);
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                triggerResponse();
+                super.close();
+            }
+
+            @Override
+            public void write(final byte[] b, final int off, final int len) throws IOException {
+                triggerResponse();
+                super.write(b, off, len);
+            }
+
+            @Override
+            public void write(final byte[] b) throws IOException {
+                triggerResponse();
+                super.write(b);
+            }
+
+            @Override
+            public void write(final int b) throws IOException {
+                triggerResponse();
+                super.write(b);
+            }
+
+        };
+
+        if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        handle(request, inputStream, responseWrapper, outputStream, context);
+                        if (inputStream != null) {
+                            inputStream.close();
+                        }
+                        outputStream.close();
+                    } catch (final Exception ex) {
+                        exception.compareAndSet(null, ex);
+                        if (inputBuffer != null) {
+                            inputBuffer.abort();
+                        }
+                        outputBuffer.abort();
+                    } finally {
+                        state.set(State.COMPLETED);
+                    }
+                }
+
+            });
+        }
+    }
+
+    @Override
+    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        if (inputBuffer != null) {
+            inputBuffer.updateCapacity(capacityChannel);
+        }
+    }
+
+    @Override
+    public final int consume(final ByteBuffer src) throws IOException {
+        Asserts.notNull(inputBuffer, "Input buffer");
+        return inputBuffer.fill(src);
+    }
+
+    @Override
+    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        Asserts.notNull(inputBuffer, "Input buffer");
+        inputBuffer.markEndStream();
+    }
+
+    @Override
+    public final int available() {
+        Asserts.notNull(outputBuffer, "Output buffer");
+        return outputBuffer.length();
+    }
+
+    @Override
+    public final void produce(final DataStreamChannel channel) throws IOException {
+        Asserts.notNull(outputBuffer, "Output buffer");
+        outputBuffer.flush(channel);
+    }
+
+    @Override
+    public final void failed(final Exception cause) {
+        exception.compareAndSet(null, cause);
+        releaseResources();
+    }
+
+    @Override
+    public void releaseResources() {
+    }
+
+}


Mime
View raw message