hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r1773514 - in /httpcomponents/httpcore/trunk: httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/ httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/ httpcor...
Date Sat, 10 Dec 2016 10:27:44 GMT
Author: olegk
Date: Sat Dec 10 10:27:44 2016
New Revision: 1773514

URL: http://svn.apache.org/viewvc?rev=1773514&view=rev
Log:
RFC 7540: Ping support

Added:
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncPingHandler.java
  (with props)
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/PingCommand.java
  (with props)
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/BasicPingHandler.java
  (with props)
Modified:
    httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
    httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http2/Http2IntegrationTest.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java?rev=1773514&r1=1773513&r2=1773514&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
Sat Dec 10 10:27:44 2016
@@ -36,8 +36,10 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -72,6 +74,8 @@ import org.apache.hc.core5.http2.frame.S
 import org.apache.hc.core5.http2.hpack.HPackDecoder;
 import org.apache.hc.core5.http2.hpack.HPackEncoder;
 import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
+import org.apache.hc.core5.http2.nio.AsyncPingHandler;
+import org.apache.hc.core5.http2.nio.command.PingCommand;
 import org.apache.hc.core5.net.InetAddressUtils;
 import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.IOSession;
@@ -101,6 +105,7 @@ abstract class AbstractHttp2StreamMultip
     private final HPackEncoder hPackEncoder;
     private final HPackDecoder hPackDecoder;
     private final Map<Integer, Http2Stream> streamMap;
+    private final Queue<AsyncPingHandler> pingHandlers;
     private final AtomicInteger connInputWindow;
     private final AtomicInteger connOutputWindow;
     private final Lock outputLock;
@@ -141,6 +146,7 @@ abstract class AbstractHttp2StreamMultip
         this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
         this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
         this.outputQueue = new ConcurrentLinkedDeque<>();
+        this.pingHandlers = new ConcurrentLinkedQueue<>();
         this.outputLock = new ReentrantLock();
         this.outputRequests = new AtomicInteger(0);
         this.lastStreamId = new AtomicInteger(0);
@@ -469,7 +475,6 @@ abstract class AbstractHttp2StreamMultip
             try {
                 if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
                     ioSession.close();
-                    cancelPendingCommands();
                 }
             } finally {
                 outputLock.unlock();
@@ -496,9 +501,29 @@ abstract class AbstractHttp2StreamMultip
     }
 
     public final void onDisconnect() {
-        cancelPendingCommands();
+        for (;;) {
+            final AsyncPingHandler pingHandler = pingHandlers.poll();
+            if (pingHandler != null) {
+                pingHandler.cancel();
+            } else {
+                break;
+            }
+        }
+        for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator();
it.hasNext(); ) {
+            final Map.Entry<Integer, Http2Stream> entry = it.next();
+            final Http2Stream stream = entry.getValue();
+            stream.cancel();
+        }
+        for (;;) {
+            final Command command = ioSession.getCommand();
+            if (command != null) {
+                command.cancel();
+            } else {
+                break;
+            }
+        }
         if (connectionListener != null) {
-            connectionListener.onConnect(this);
+            connectionListener.onDisconnect(this);
         }
     }
 
@@ -553,34 +578,12 @@ abstract class AbstractHttp2StreamMultip
                 if (!outputQueue.isEmpty()) {
                     return;
                 }
-            }
-        }
-    }
-
-    private void cancelPendingCommands() {
-        for (;;) {
-            final Command command = ioSession.getCommand();
-            if (command != null) {
-                command.cancel();
-            } else {
-                break;
-            }
-        }
-    }
-
-    private void failPendingCommands(final Exception cause) {
-        for (;;) {
-            final Command command = ioSession.getCommand();
-            if (command != null) {
-                if (command instanceof ExecutionCommand) {
-                    final ExecutionCommand executionCommand = (ExecutionCommand) command;
-                    final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
-                    exchangeHandler.failed(cause);
-                } else {
-                    command.cancel();
-                }
-            } else {
-                break;
+            } else if (command instanceof PingCommand) {
+                final PingCommand pingCommand = (PingCommand) command;
+                final AsyncPingHandler handler = pingCommand.getHandler();
+                pingHandlers.add(handler);
+                final RawFrame ping = frameFactory.createPing(handler.getData());
+                commitFrame(ping);
             }
         }
     }
@@ -590,6 +593,34 @@ abstract class AbstractHttp2StreamMultip
             connectionListener.onError(this, cause);
         }
         try {
+            for (;;) {
+                final AsyncPingHandler pingHandler = pingHandlers.poll();
+                if (pingHandler != null) {
+                    pingHandler.failed(cause);
+                } else {
+                    break;
+                }
+            }
+            for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator();
it.hasNext(); ) {
+                final Map.Entry<Integer, Http2Stream> entry = it.next();
+                final Http2Stream stream = entry.getValue();
+                stream.reset(cause);
+            }
+            streamMap.clear();
+            for (;;) {
+                final Command command = ioSession.getCommand();
+                if (command != null) {
+                    if (command instanceof ExecutionCommand) {
+                        final ExecutionCommand executionCommand = (ExecutionCommand) command;
+                        final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
+                        exchangeHandler.failed(cause);
+                    } else {
+                        command.cancel();
+                    }
+                } else {
+                    break;
+                }
+            }
             if (!(cause instanceof ConnectionClosedException)) {
                 final H2Error errorCode;
                 if (cause instanceof H2ConnectionException) {
@@ -602,13 +633,6 @@ abstract class AbstractHttp2StreamMultip
                 final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId,
errorCode, cause.getMessage());
                 commitFrame(goAway);
             }
-            for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator();
it.hasNext(); ) {
-                final Map.Entry<Integer, Http2Stream> entry = it.next();
-                final Http2Stream stream = entry.getValue();
-                stream.reset(cause);
-            }
-            streamMap.clear();
-            failPendingCommands(cause);
             connState = ConnectionHandshake.SHUTDOWN;
         } catch (IOException ignore) {
         } finally {
@@ -774,9 +798,20 @@ abstract class AbstractHttp2StreamMultip
                 if (streamId != 0) {
                     throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream
id");
                 }
-                if (!frame.isFlagSet(FrameFlag.ACK)) {
-                    final ByteBuffer payload = frame.getPayload();
-                    final RawFrame response = frameFactory.createPingAck(payload);
+                final ByteBuffer ping = frame.getPayloadContent();
+                if (ping == null || ping.remaining() != 8) {
+                    throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING
frame payload");
+                }
+                if (frame.isFlagSet(FrameFlag.ACK)) {
+                    final AsyncPingHandler pingHandler = pingHandlers.poll();
+                    if (pingHandler != null) {
+                        pingHandler.consumeResponse(ping);
+                    }
+                } else {
+                    final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
+                    pong.put(ping);
+                    pong.flip();
+                    final RawFrame response = frameFactory.createPingAck(pong);
                     commitFrame(response);
                 }
             }

Added: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncPingHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncPingHandler.java?rev=1773514&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncPingHandler.java
(added)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncPingHandler.java
Sat Dec 10 10:27:44 2016
@@ -0,0 +1,49 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hc.core5.http.HttpException;
+
+/**
+ * Abstract asynchronous ping exchange handler.
+ *
+ * @since 5.0
+ */
+public interface AsyncPingHandler {
+
+    ByteBuffer getData();
+
+    void consumeResponse(ByteBuffer feedback) throws HttpException, IOException;
+
+    void failed(Exception cause);
+
+    void cancel();
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncPingHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncPingHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/AsyncPingHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/PingCommand.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/PingCommand.java?rev=1773514&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/PingCommand.java
(added)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/PingCommand.java
Sat Dec 10 10:27:44 2016
@@ -0,0 +1,57 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http2.nio.command;
+
+import org.apache.hc.core5.http2.nio.AsyncPingHandler;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Ping command.
+ *
+ * @since 5.0
+ */
+public final class PingCommand implements Command {
+
+    private final AsyncPingHandler handler;
+
+    public PingCommand(final AsyncPingHandler handler) {
+        this.handler = Args.notNull(handler, "Handler");
+    }
+
+    public AsyncPingHandler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public boolean cancel() {
+        handler.cancel();
+        return true;
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/PingCommand.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/PingCommand.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/command/PingCommand.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/BasicPingHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/BasicPingHandler.java?rev=1773514&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/BasicPingHandler.java
(added)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/BasicPingHandler.java
Sat Dec 10 10:27:44 2016
@@ -0,0 +1,81 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.nio.support;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http2.nio.AsyncPingHandler;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+public class BasicPingHandler implements AsyncPingHandler {
+
+    private static final byte[] PING_MESSAGE = new byte[] {'*', '*', 'p', 'i', 'n', 'g',
'*', '*'};
+
+    private final Callback<Boolean> callback;
+
+    public BasicPingHandler(final Callback<Boolean> callback) {
+        this.callback = Args.notNull(callback, "Callback");
+    }
+
+    @Override
+    public ByteBuffer getData() {
+        return ByteBuffer.wrap(PING_MESSAGE);
+    }
+
+    @Override
+    public void consumeResponse(final ByteBuffer feedback) throws HttpException, IOException
{
+        boolean result = true;
+        for (int i = 0; i < PING_MESSAGE.length; i++) {
+            final byte b = feedback.get();
+            System.out.println("PING " + b);
+//            if (!feedback.hasRemaining() || PING_MESSAGE[i] != b) {
+            if (PING_MESSAGE[i] != b) {
+                System.out.println("MISMATCH");
+                result = false;
+                break;
+            }
+        }
+        callback.execute(result);
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        callback.execute(Boolean.FALSE);
+    }
+
+    @Override
+    public void cancel() {
+        callback.execute(Boolean.FALSE);
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/BasicPingHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/BasicPingHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/BasicPingHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http2/Http2IntegrationTest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http2/Http2IntegrationTest.java?rev=1773514&r1=1773513&r2=1773514&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http2/Http2IntegrationTest.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http2/Http2IntegrationTest.java
Sat Dec 10 10:27:44 2016
@@ -51,12 +51,16 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.StringTokenizer;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
@@ -66,7 +70,6 @@ import org.apache.hc.core5.http.HttpRequ
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.Message;
-import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.impl.nio.AbstractClassicServerExchangeHandler;
 import org.apache.hc.core5.http.impl.nio.bootstrap.ClientEndpoint;
 import org.apache.hc.core5.http.impl.nio.entity.AbstractClassicEntityConsumer;
@@ -97,6 +100,8 @@ import org.apache.hc.core5.http.protocol
 import org.apache.hc.core5.http2.H2Error;
 import org.apache.hc.core5.http2.H2StreamResetException;
 import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.nio.command.PingCommand;
+import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.testing.ProtocolScheme;
 import org.apache.hc.core5.testing.nio.http.EchoHandler;
@@ -910,4 +915,46 @@ public class Http2IntegrationTest extend
         Assert.assertEquals(digest, map.get("digest"));
     }
 
+    @Test
+    public void testConnectionPing() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final int n = 10;
+        final CountDownLatch latch = new CountDownLatch(n);
+        final AtomicInteger count = new AtomicInteger(0);
+        final Queue<Future<String>> queue = new LinkedList<>();
+        for (int i = 0; i < n; i++) {
+            streamEndpoint.execute(
+                    new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            streamEndpoint.execute(new PingCommand(new BasicPingHandler(new Callback<Boolean>()
{
+
+                @Override
+                public void execute(final Boolean result) {
+                    if (result) {
+                        count.incrementAndGet();
+                    }
+                    latch.countDown();
+                }
+
+            })));
+
+        }
+        Assert.assertTrue(latch.await(TIMEOUT, TimeUnit.SECONDS));
+        Assert.assertEquals(n, count.get());
+    }
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java?rev=1773514&r1=1773513&r2=1773514&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java
(original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpoint.java
Sat Dec 10 10:27:44 2016
@@ -66,6 +66,13 @@ public final class ClientEndpoint implem
         this.closed = new AtomicBoolean(false);
     }
 
+    public void execute(final Command command) {
+        ioSession.addLast(command);
+        if (ioSession.isClosed()) {
+            command.cancel();
+        }
+    }
+
     public void execute(
             final AsyncClientExchangeHandler exchangeHandler,
             final HttpContext context) {
@@ -73,10 +80,7 @@ public final class ClientEndpoint implem
         final Command executionCommand = new ExecutionCommand(
                 exchangeHandler,
                 context != null ? context : HttpCoreContext.create());
-        ioSession.addLast(executionCommand);
-        if (ioSession.isClosed()) {
-            executionCommand.cancel();
-        }
+        execute(executionCommand);
     }
 
     public <T> Future<T> execute(



Mime
View raw message