hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject [3/4] httpcomponents-core git commit: Shared I/O session and HTTP/2 connection pool implementations
Date Wed, 01 Nov 2017 09:19:14 GMT
Shared I/O session and HTTP/2 connection pool implementations


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/221f695c
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/221f695c
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/221f695c

Branch: refs/heads/master
Commit: 221f695c3660518eb941c9b8ed95ad69fc85048d
Parents: 3d80bec
Author: Oleg Kalnichevski <olegk@apache.org>
Authored: Sat Oct 28 13:02:25 2017 +0200
Committer: Oleg Kalnichevski <olegk@apache.org>
Committed: Wed Nov 1 09:59:59 2017 +0100

----------------------------------------------------------------------
 .../hc/core5/http2/nio/pool/H2ConnPool.java     | 156 ++++++++++
 .../hc/core5/concurrent/ComplexFuture.java      |  14 +
 .../hc/core5/reactor/AbstractIOSessionPool.java | 300 +++++++++++++++++++
 .../reactor/TestAbstractIOSessionPool.java      | 289 ++++++++++++++++++
 4 files changed, 759 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/221f695c/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
new file mode 100644
index 0000000..700698a
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
@@ -0,0 +1,156 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.nio.pool;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Future;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.impl.DefaultAddressResolver;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http2.nio.command.PingCommand;
+import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.AbstractIOSessionPool;
+import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class H2ConnPool extends AbstractIOSessionPool<HttpHost> {
+
+    private final ConnectionInitiator connectionInitiator;
+    private final Resolver<HttpHost, InetSocketAddress> addressResolver;
+    private final TlsStrategy tlsStrategy;
+
+    private volatile TimeValue validateAfterInactivity;
+
+    public H2ConnPool(
+            final ConnectionInitiator connectionInitiator,
+            final Resolver<HttpHost, InetSocketAddress> addressResolver,
+            final TlsStrategy tlsStrategy) {
+        super();
+        this.connectionInitiator = Args.notNull(connectionInitiator, "Connection initiator");
+        this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE;
+        this.tlsStrategy = tlsStrategy;
+    }
+
+    public TimeValue getValidateAfterInactivity() {
+        return validateAfterInactivity;
+    }
+
+    public void setValidateAfterInactivity(final TimeValue timeValue) {
+        this.validateAfterInactivity = timeValue;
+    }
+
+    @Override
+    protected void closeSession(
+            final IOSession ioSession,
+            final ShutdownType shutdownType) {
+        if (shutdownType == ShutdownType.GRACEFUL) {
+            ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+        } else {
+            ioSession.shutdown(shutdownType);
+        }
+    }
+
+    @Override
+    protected Future<IOSession> connectSession(
+            final HttpHost namedEndpoint,
+            final Timeout requestTimeout,
+            final FutureCallback<IOSession> callback) {
+        final InetSocketAddress remoteAddress = addressResolver.resolve(namedEndpoint);
+        return connectionInitiator.connect(namedEndpoint, remoteAddress, null, requestTimeout,
null, new FutureCallback<IOSession>() {
+
+            @Override
+            public void completed(final IOSession ioSession) {
+                if (tlsStrategy != null
+                        && URIScheme.HTTPS.same(namedEndpoint.getSchemeName())
+                        && ioSession instanceof TransportSecurityLayer) {
+                    tlsStrategy.upgrade(
+                            (TransportSecurityLayer) ioSession,
+                            namedEndpoint,
+                            ioSession.getLocalAddress(),
+                            ioSession.getRemoteAddress(),
+                            null);
+                    ioSession.setSocketTimeout(requestTimeout.toMillisIntBound());
+                }
+                callback.completed(ioSession);
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                callback.failed(ex);
+            }
+
+            @Override
+            public void cancelled() {
+                callback.cancelled();
+            }
+
+        });
+    }
+
+    @Override
+    protected void validateSession(
+            final IOSession ioSession,
+            final Callback<Boolean> callback) {
+        final TimeValue timeValue = validateAfterInactivity;
+        if (TimeValue.isPositive(timeValue)) {
+            final long lastAccessTime = Math.min(ioSession.getLastReadTime(), ioSession.getLastWriteTime());
+            final long deadline = lastAccessTime + timeValue.toMillis();
+            if (deadline <= System.currentTimeMillis()) {
+                final int socketTimeout = ioSession.getSocketTimeout();
+                ioSession.addLast(new PingCommand(new BasicPingHandler(new Callback<Boolean>()
{
+
+                    @Override
+                    public void execute(final Boolean result) {
+                        ioSession.setSocketTimeout(socketTimeout);
+                        callback.execute(result);
+                    }
+
+                })));
+                return;
+            }
+        }
+        callback.execute(true);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/221f695c/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java
index 3b94528..c0c8adc 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java
@@ -74,6 +74,20 @@ public final class ComplexFuture<T> extends BasicFuture<T>
{
     }
 
     @Override
+    public boolean completed(final T result) {
+        final boolean completed = super.completed(result);
+        clearDependency();
+        return completed;
+    }
+
+    @Override
+    public boolean failed(final Exception exception) {
+        final boolean failed = super.failed(exception);
+        clearDependency();
+        return failed;
+    }
+
+    @Override
     public boolean cancel(final boolean mayInterruptIfRunning) {
         final boolean cancelled = super.cancel(mayInterruptIfRunning);
         final Cancellable dependency = dependencyRef.getAndSet(null);

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/221f695c/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java
b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java
new file mode 100644
index 0000000..adfa360
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java
@@ -0,0 +1,300 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactor;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.io.GracefullyCloseable;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public abstract class AbstractIOSessionPool<T> implements GracefullyCloseable {
+
+    private final ConcurrentMap<T, PoolEntry> sessionPool;
+    private final AtomicBoolean closed;
+
+    public AbstractIOSessionPool() {
+        super();
+        this.sessionPool = new ConcurrentHashMap<>();
+        this.closed = new AtomicBoolean(false);
+    }
+
+    protected abstract Future<IOSession> connectSession(
+            T namedEndpoint,
+            Timeout requestTimeout,
+            FutureCallback<IOSession> callback);
+
+    protected abstract void validateSession(
+            IOSession ioSession,
+            Callback<Boolean> callback);
+
+    protected abstract void closeSession(
+            IOSession ioSession,
+            ShutdownType shutdownType);
+
+    @Override
+    public final void shutdown(final ShutdownType shutdownType) {
+        if (closed.compareAndSet(false, true)) {
+            for (final PoolEntry poolEntry : sessionPool.values()) {
+                synchronized (poolEntry) {
+                    if (poolEntry.session != null) {
+                        closeSession(poolEntry.session, shutdownType);
+                        poolEntry.session = null;
+                    }
+                    if (poolEntry.sessionFuture != null) {
+                        poolEntry.sessionFuture.cancel(true);
+                        poolEntry.sessionFuture = null;
+                    }
+                    for (;;) {
+                        final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
+                        if (callback != null) {
+                            callback.cancelled();
+                        } else {
+                            break;
+                        }
+                    }
+                }
+            }
+            sessionPool.clear();
+        }
+    }
+
+    @Override
+    public final void close() {
+        shutdown(ShutdownType.GRACEFUL);
+    }
+
+    PoolEntry getPoolEntry(final T endpoint) {
+        PoolEntry poolEntry = sessionPool.get(endpoint);
+        if (poolEntry == null) {
+            final PoolEntry newPoolEntry = new PoolEntry();
+            poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
+            if (poolEntry == null) {
+                poolEntry = newPoolEntry;
+            }
+        }
+        return poolEntry;
+    }
+
+    public final Future<IOSession> getSession(
+            final T endpoint,
+            final Timeout requestTimeout,
+            final FutureCallback<IOSession> callback) {
+        Args.notNull(endpoint, "Endpoint");
+        Asserts.check(!closed.get(), "Connection pool shut down");
+        final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
+        final PoolEntry poolEntry = getPoolEntry(endpoint);
+        getSessionInternal(poolEntry, false, endpoint, requestTimeout, new FutureCallback<IOSession>()
{
+
+            @Override
+            public void completed(final IOSession ioSession) {
+                validateSession(ioSession, new Callback<Boolean>() {
+
+                    @Override
+                    public void execute(final Boolean result) {
+                        if (result) {
+                            future.completed(ioSession);
+                        } else {
+                            getSessionInternal(poolEntry, true, endpoint, requestTimeout,
new FutureCallback<IOSession>() {
+
+                                @Override
+                                public void completed(final IOSession ioSession) {
+                                    future.completed(ioSession);
+                                }
+
+                                @Override
+                                public void failed(final Exception ex) {
+                                    future.failed(ex);
+                                }
+
+                                @Override
+                                public void cancelled() {
+                                    future.cancel();
+                                }
+
+                            });
+                        }
+                    }
+
+                });
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                future.failed(ex);
+            }
+
+            @Override
+            public void cancelled() {
+                future.cancel();
+            }
+
+        });
+        return future;
+    }
+
+    private void getSessionInternal(
+            final PoolEntry poolEntry,
+            final boolean requestNew,
+            final T namedEndpoint,
+            final Timeout requestTimeout,
+            final FutureCallback<IOSession> callback) {
+        synchronized (poolEntry) {
+            if (poolEntry.session != null && requestNew) {
+                closeSession(poolEntry.session, ShutdownType.GRACEFUL);
+                poolEntry.session = null;
+            }
+            if (poolEntry.session != null && poolEntry.session.isClosed()) {
+                poolEntry.session = null;
+            }
+            if (poolEntry.session != null) {
+                callback.completed(poolEntry.session);
+            } else {
+                poolEntry.requestQueue.add(callback);
+                if (poolEntry.sessionFuture == null) {
+                    poolEntry.sessionFuture = connectSession(
+                            namedEndpoint,
+                            requestTimeout,
+                            new FutureCallback<IOSession>() {
+
+                                @Override
+                                public void completed(final IOSession result) {
+                                    synchronized (poolEntry) {
+                                        poolEntry.session = result;
+                                        poolEntry.sessionFuture = null;
+                                        for (;;) {
+                                            final FutureCallback<IOSession> callback
= poolEntry.requestQueue.poll();
+                                            if (callback != null) {
+                                                callback.completed(result);
+                                            } else {
+                                                break;
+                                            }
+                                        }
+                                    }
+                                }
+
+                                @Override
+                                public void failed(final Exception ex) {
+                                    synchronized (poolEntry) {
+                                        poolEntry.session = null;
+                                        poolEntry.sessionFuture = null;
+                                        for (;;) {
+                                            final FutureCallback<IOSession> callback
= poolEntry.requestQueue.poll();
+                                            if (callback != null) {
+                                                callback.failed(ex);
+                                            } else {
+                                                break;
+                                            }
+                                        }
+                                    }
+                                }
+
+                                @Override
+                                public void cancelled() {
+                                    failed(new ConnectionClosedException("Connection request
cancelled"));
+                                }
+
+                            });
+                }
+            }
+        }
+    }
+
+    public final void enumAvailable(final Callback<IOSession> callback) {
+        for (final PoolEntry poolEntry: sessionPool.values()) {
+            if (poolEntry.session != null) {
+                synchronized (poolEntry) {
+                    if (poolEntry.session != null) {
+                        callback.execute(poolEntry.session);
+                        if (poolEntry.session.isClosed()) {
+                            poolEntry.session = null;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public final void closeIdle(final TimeValue idleTime) {
+        final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime)
? idleTime.toMillis() : 0);
+        for (final PoolEntry poolEntry: sessionPool.values()) {
+            if (poolEntry.session != null) {
+                synchronized (poolEntry) {
+                    if (poolEntry.session != null && poolEntry.session.getLastReadTime()
<= deadline) {
+                        closeSession(poolEntry.session, ShutdownType.GRACEFUL);
+                        poolEntry.session = null;
+                    }
+                }
+            }
+        }
+    }
+
+    public final Set<T> getRoutes() {
+        return new HashSet<>(sessionPool.keySet());
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder buffer = new StringBuilder();
+        buffer.append("I/O sessions: ");
+        buffer.append(sessionPool.size());
+        return buffer.toString();
+    }
+
+    static class PoolEntry {
+
+        final Queue<FutureCallback<IOSession>> requestQueue;
+        volatile Future<IOSession> sessionFuture;
+        volatile IOSession session;
+
+        PoolEntry() {
+            this.requestQueue = new ArrayDeque<>();
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/221f695c/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestAbstractIOSessionPool.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestAbstractIOSessionPool.java
b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestAbstractIOSessionPool.java
new file mode 100644
index 0000000..1085bf2
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestAbstractIOSessionPool.java
@@ -0,0 +1,289 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactor;
+
+import java.util.concurrent.Future;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestAbstractIOSessionPool {
+
+    @Mock
+    private Future<IOSession> connectFuture;
+    @Mock
+    private FutureCallback<IOSession> callback1;
+    @Mock
+    private FutureCallback<IOSession> callback2;
+    @Mock
+    private IOSession ioSession1;
+    @Mock
+    private IOSession ioSession2;
+
+    private AbstractIOSessionPool<String> impl;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setup() {
+        impl = Mockito.mock(AbstractIOSessionPool.class, Mockito.withSettings()
+                .defaultAnswer(Answers.CALLS_REAL_METHODS)
+                .useConstructor());
+    }
+
+    @Test
+    public void testGetSessions() throws Exception {
+
+        Mockito.when(impl.connectSession(
+                Mockito.anyString(),
+                Mockito.<Timeout>any(),
+                Mockito.<FutureCallback<IOSession>>any())).thenReturn(connectFuture);
+
+        Mockito.doAnswer(new Answer() {
+
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                final Callback<Boolean> callback = invocation.getArgument(1);
+                callback.execute(true);
+                return null;
+            }
+
+        }).when(impl).validateSession(Mockito.<IOSession>any(), Mockito.<Callback<Boolean>>any());
+
+        final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L),
null);
+        Assert.assertThat(future1, CoreMatchers.notNullValue());
+        Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(false));
+        Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
+
+        Mockito.verify(impl).connectSession(
+                Mockito.eq("somehost"),
+                Mockito.eq(Timeout.ofSeconds(123L)),
+                Mockito.<FutureCallback<IOSession>>any());
+
+        final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L),
null);
+        Assert.assertThat(future2, CoreMatchers.notNullValue());
+        Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(false));
+        Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
+
+        Mockito.verify(impl, Mockito.times(1)).connectSession(
+                Mockito.eq("somehost"),
+                Mockito.<Timeout>any(),
+                Mockito.argThat(new ArgumentMatcher<FutureCallback<IOSession>>()
{
+
+                    @Override
+                    public boolean matches(final FutureCallback<IOSession> callback)
{
+                        callback.completed(ioSession1);
+                        return true;
+                    }
+
+                }));
+
+        Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(true));
+        Assert.assertThat(future1.get(), CoreMatchers.sameInstance(ioSession1));
+
+        Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(true));
+        Assert.assertThat(future2.get(), CoreMatchers.sameInstance(ioSession1));
+
+        Mockito.verify(impl, Mockito.times(2)).validateSession(Mockito.<IOSession>any(),
Mockito.<Callback<Boolean>>any());
+
+        final Future<IOSession> future3 = impl.getSession("somehost", Timeout.ofSeconds(123L),
null);
+
+        Mockito.verify(impl, Mockito.times(1)).connectSession(
+                Mockito.eq("somehost"),
+                Mockito.<Timeout>any(),
+                Mockito.<FutureCallback<IOSession>>any());
+
+        Mockito.verify(impl, Mockito.times(3)).validateSession(Mockito.<IOSession>any(),
Mockito.<Callback<Boolean>>any());
+
+        Assert.assertThat(future3.isDone(), CoreMatchers.equalTo(true));
+        Assert.assertThat(future3.get(), CoreMatchers.sameInstance(ioSession1));
+    }
+
+    @Test
+    public void testGetSessionFailure() throws Exception {
+
+        Mockito.when(impl.connectSession(
+                Mockito.anyString(),
+                Mockito.<Timeout>any(),
+                Mockito.<FutureCallback<IOSession>>any())).thenReturn(connectFuture);
+
+        final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L),
null);
+        Assert.assertThat(future1, CoreMatchers.notNullValue());
+        Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(false));
+        Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
+
+        Mockito.verify(impl).connectSession(
+                Mockito.eq("somehost"),
+                Mockito.eq(Timeout.ofSeconds(123L)),
+                Mockito.<FutureCallback<IOSession>>any());
+
+        final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L),
null);
+        Assert.assertThat(future2, CoreMatchers.notNullValue());
+        Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(false));
+        Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
+
+        Mockito.verify(impl, Mockito.times(1)).connectSession(
+                Mockito.eq("somehost"),
+                Mockito.<Timeout>any(),
+                Mockito.argThat(new ArgumentMatcher<FutureCallback<IOSession>>()
{
+
+                    @Override
+                    public boolean matches(final FutureCallback<IOSession> callback)
{
+                        callback.failed(new Exception("Boom"));
+                        return true;
+                    }
+
+                }));
+
+        Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(true));
+        Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(true));
+    }
+
+    @Test
+    public void testShutdownPool() throws Exception {
+        final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
+        Assert.assertThat(entry1, CoreMatchers.notNullValue());
+        entry1.session = ioSession1;
+
+        final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
+        Assert.assertThat(entry2, CoreMatchers.notNullValue());
+        entry2.session = ioSession2;
+
+        final AbstractIOSessionPool.PoolEntry entry3 = impl.getPoolEntry("host3");
+        Assert.assertThat(entry3, CoreMatchers.notNullValue());
+        entry3.sessionFuture = connectFuture;
+        entry3.requestQueue.add(callback1);
+        entry3.requestQueue.add(callback2);
+
+        impl.shutdown(ShutdownType.GRACEFUL);
+
+        Mockito.verify(impl).closeSession(ioSession1, ShutdownType.GRACEFUL);
+        Mockito.verify(impl).closeSession(ioSession2, ShutdownType.GRACEFUL);
+        Mockito.verify(connectFuture).cancel(Mockito.anyBoolean());
+        Mockito.verify(callback1).cancelled();
+        Mockito.verify(callback2).cancelled();
+    }
+
+    @Test
+    public void testCloseIdleSessions() throws Exception {
+        final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
+        Assert.assertThat(entry1, CoreMatchers.notNullValue());
+        entry1.session = ioSession1;
+
+        final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
+        Assert.assertThat(entry2, CoreMatchers.notNullValue());
+        entry2.session = ioSession2;
+
+        impl.closeIdle(TimeValue.ofMillis(0L));
+
+        Mockito.verify(impl).closeSession(ioSession1, ShutdownType.GRACEFUL);
+        Mockito.verify(impl).closeSession(ioSession2, ShutdownType.GRACEFUL);
+
+        Assert.assertThat(entry1.session, CoreMatchers.nullValue());
+        Assert.assertThat(entry2.session, CoreMatchers.nullValue());
+    }
+
+    @Test
+    public void testEnumSessions() throws Exception {
+        final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
+        Assert.assertThat(entry1, CoreMatchers.notNullValue());
+        entry1.session = ioSession1;
+
+        final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
+        Assert.assertThat(entry2, CoreMatchers.notNullValue());
+        entry2.session = ioSession2;
+
+        impl.enumAvailable(new Callback<IOSession>() {
+
+            @Override
+            public void execute(final IOSession ioSession) {
+                ioSession.shutdown(ShutdownType.GRACEFUL);
+            }
+
+        });
+        Mockito.verify(ioSession1).shutdown(ShutdownType.GRACEFUL);
+        Mockito.verify(ioSession2).shutdown(ShutdownType.GRACEFUL);
+    }
+
+    @Test
+    public void testGetSessionReconnectAfterValidate() throws Exception {
+        final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("somehost");
+        Assert.assertThat(entry1, CoreMatchers.notNullValue());
+        entry1.session = ioSession1;
+
+        Mockito.when(ioSession1.isClosed()).thenReturn(false);
+        Mockito.doAnswer(new Answer() {
+
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                final Callback<Boolean> callback = invocation.getArgument(1);
+                callback.execute(false);
+                return null;
+            }
+
+        }).when(impl).validateSession(Mockito.<IOSession>any(), Mockito.<Callback<Boolean>>any());
+
+        impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+
+        Mockito.verify(impl, Mockito.times(1)).connectSession(
+                Mockito.eq("somehost"),
+                Mockito.eq(Timeout.ofSeconds(123L)),
+                Mockito.<FutureCallback<IOSession>>any());
+    }
+
+    @Test
+    public void testGetSessionReconnectIfClosed() throws Exception {
+        final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("somehost");
+        Assert.assertThat(entry1, CoreMatchers.notNullValue());
+        entry1.session = ioSession1;
+
+        Mockito.when(ioSession1.isClosed()).thenReturn(true);
+
+        impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+
+        Mockito.verify(impl).connectSession(
+                Mockito.eq("somehost"),
+                Mockito.eq(Timeout.ofSeconds(123L)),
+                Mockito.<FutureCallback<IOSession>>any());
+    }
+
+}


Mime
View raw message